This is an automated email from the ASF dual-hosted git repository.
loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b29aa3b997 [CORE][CH] Support MicroBatchScanExec with KafkaScan in
batch mode (#8321)
b29aa3b997 is described below
commit b29aa3b9973d28ed03e9840772734dd1f47b3cdd
Author: Shuai li <[email protected]>
AuthorDate: Mon Jan 20 09:53:38 2025 +0800
[CORE][CH] Support MicroBatchScanExec with KafkaScan in batch mode (#8321)
* [CH] Support MicroBatchScanExec with KafkaScan in batch mode
---
backends-clickhouse/pom.xml | 26 ++
.../org.apache.gluten.component.CHKafkaComponent | 0
.../CHKafkaComponent.scala | 32 ++
.../kafka/ClickhouseGlutenKafkaScanSuite.scala | 28 ++
.../gluten/backendsapi/clickhouse/CHBackend.scala | 2 +
.../backendsapi/clickhouse/CHIteratorApi.scala | 2 +
.../gluten/metrics/BatchScanMetricsUpdater.scala | 5 +-
cpp-ch/CMakeLists.txt | 2 +-
cpp-ch/local-engine/CMakeLists.txt | 2 +
cpp-ch/local-engine/Common/CHUtil.cpp | 3 +
.../Parser/RelParsers/ReadRelParser.cpp | 27 +-
.../local-engine/Parser/RelParsers/ReadRelParser.h | 4 +-
.../Parser/RelParsers/StreamKafkaRelParser.cpp | 96 ++++++
.../Parser/RelParsers/StreamKafkaRelParser.h | 58 ++++
.../Storages/Kafka/GlutenKafkaSource.cpp | 325 +++++++++++++++++++++
.../Storages/Kafka/GlutenKafkaSource.h | 110 +++++++
.../Storages/Kafka/GlutenKafkaUtils.cpp | 301 +++++++++++++++++++
.../local-engine/Storages/Kafka/GlutenKafkaUtils.h | 74 +++++
.../Storages/Kafka/ReadFromGlutenStorageKafka.cpp | 100 +++++++
.../Storages/Kafka/ReadFromGlutenStorageKafka.h | 65 +++++
docs/developers/SubstraitModifications.md | 1 +
gluten-kafka/pom.xml | 149 ++++++++++
.../substrait/rel/StreamKafkaSourceBuilder.java | 41 +++
.../execution/MicroBatchScanExecTransformer.scala | 116 ++++++++
.../apache/gluten/execution/OffloadKafkaScan.scala | 56 ++++
.../sql/kafka010/GlutenStreamKafkaSourceUtil.scala | 48 +++
.../execution/kafka/GlutenKafkaScanSuite.scala | 89 ++++++
.../gluten/substrait/rel/LocalFilesNode.java | 1 +
.../apache/gluten/substrait/rel/ReadRelNode.java | 6 +
.../substrait/rel/StreamKafkaSourceNode.java | 80 +++++
.../substrait/proto/substrait/algebra.proto | 17 ++
pom.xml | 73 +++++
.../datasources/v2/AbstractBatchScanExec.scala | 5 +-
.../datasources/v2/AbstractBatchScanExec.scala | 5 +-
.../datasources/v2/AbstractBatchScanExec.scala | 5 +-
.../datasources/v2/AbstractBatchScanExec.scala | 5 +-
36 files changed, 1948 insertions(+), 11 deletions(-)
diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index fa6863b877..6ca945d8cc 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -68,6 +68,32 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>kafka</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-kafka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-kafka</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
<dependencies>
diff --git
a/backends-clickhouse/src-kafka/main/resources/META-INF/gluten-components/org.apache.gluten.component.CHKafkaComponent
b/backends-clickhouse/src-kafka/main/resources/META-INF/gluten-components/org.apache.gluten.component.CHKafkaComponent
new file mode 100644
index 0000000000..e69de29bb2
diff --git
a/backends-clickhouse/src-kafka/main/scala/org.apache.gluten.component/CHKafkaComponent.scala
b/backends-clickhouse/src-kafka/main/scala/org.apache.gluten.component/CHKafkaComponent.scala
new file mode 100644
index 0000000000..1e9f0f9d77
--- /dev/null
+++
b/backends-clickhouse/src-kafka/main/scala/org.apache.gluten.component/CHKafkaComponent.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.component
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackend
+import org.apache.gluten.execution.OffloadKafkaScan
+import org.apache.gluten.extension.injector.Injector
+
+class CHKafkaComponent extends Component {
+ override def name(): String = "clickhouse-kafka"
+ override def buildInfo(): Component.BuildInfo =
+ Component.BuildInfo("ClickHouseKafka", "N/A", "N/A", "N/A")
+ override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend]
:: Nil
+ override def injectRules(injector: Injector): Unit = {
+ OffloadKafkaScan.inject(injector)
+ }
+}
diff --git
a/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala
b/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala
new file mode 100644
index 0000000000..cfb53a30ac
--- /dev/null
+++
b/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.kafka
+
+import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite
+
+class ClickhouseGlutenKafkaScanSuite
+ extends GlutenClickHouseWholeStageTransformerSuite
+ with GlutenKafkaScanSuite {
+
+ override protected val fileFormat: String = "parquet"
+
+ protected val kafkaBootstrapServers: String = "localhost:9092"
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 9626987593..a47aab55fe 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -200,6 +200,7 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
ValidationResult.failed("Has complex type.")
}
case JsonReadFormat => ValidationResult.succeeded
+ case KafkaReadFormat => ValidationResult.succeeded
case _ => ValidationResult.failed(s"Unsupported file format $format")
}
}
@@ -221,6 +222,7 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
case "OrcScan" => ReadFileFormat.OrcReadFormat
case "ParquetScan" => ReadFileFormat.ParquetReadFormat
case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
+ case "KafkaScan" => ReadFileFormat.KafkaReadFormat
case _ => ReadFileFormat.UnknownFormat
}
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 0b20e5aeea..b041b1b817 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -243,6 +243,8 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
(filesNode.toProtobuf.toByteArray,
filesNode.getPaths.asScala.toSeq)
case extensionTableNode: ExtensionTableNode =>
(extensionTableNode.toProtobuf.toByteArray,
extensionTableNode.getPartList)
+ case kafkaSourceNode: StreamKafkaSourceNode =>
+ (kafkaSourceNode.toProtobuf.toByteArray, Seq.empty)
}
}.unzip
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
index 1b9389351b..9002b33e38 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
@@ -64,6 +64,7 @@ class BatchScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric])
object BatchScanMetricsUpdater {
// in mergetree format, the processor name is `MergeTreeSelect(pool: XXX,
algorithm: XXX)`
- val INCLUDING_PROCESSORS = Array("MergeTreeSelect(pool",
"SubstraitFileSource")
- val CH_PLAN_NODE_NAME = Array("MergeTreeSelect(pool", "SubstraitFileSource")
+ val INCLUDING_PROCESSORS =
+ Array("MergeTreeSelect(pool", "SubstraitFileSource", "GlutenKafkaSource")
+ val CH_PLAN_NODE_NAME = Array("MergeTreeSelect(pool", "SubstraitFileSource",
"GlutenKafkaSource")
}
diff --git a/cpp-ch/CMakeLists.txt b/cpp-ch/CMakeLists.txt
index 15be68a2a6..1b23fa38d7 100644
--- a/cpp-ch/CMakeLists.txt
+++ b/cpp-ch/CMakeLists.txt
@@ -106,7 +106,7 @@ else()
-DENABLE_TESTS=OFF -DENABLE_JEMALLOC=ON -DENABLE_MULTITARGET_CODE=ON
-DENABLE_EXTERN_LOCAL_ENGINE=ON -DENABLE_ODBC=OFF -DENABLE_CAPNP=OFF
-DENABLE_GRPC=OFF -DENABLE_RUST=OFF -DENABLE_H3=OFF -DENABLE_AMQPCPP=OFF
- -DENABLE_CASSANDRA=OFF -DENABLE_KAFKA=OFF -DENABLE_NATS=OFF
+ -DENABLE_CASSANDRA=OFF -DENABLE_KAFKA=ON -DENABLE_NATS=OFF
-DENABLE_LIBPQXX=OFF -DENABLE_NURAFT=OFF -DENABLE_DATASKETCHES=OFF
-DENABLE_SQLITE=OFF -DENABLE_S2_GEOMETRY=OFF -DENABLE_ANNOY=OFF
-DENABLE_ULID=OFF -DENABLE_MYSQL=OFF -DENABLE_BCRYPT=OFF
-DENABLE_LDAP=OFF
diff --git a/cpp-ch/local-engine/CMakeLists.txt
b/cpp-ch/local-engine/CMakeLists.txt
index f1819ff217..4be2bdbc38 100644
--- a/cpp-ch/local-engine/CMakeLists.txt
+++ b/cpp-ch/local-engine/CMakeLists.txt
@@ -53,10 +53,12 @@ add_headers_and_sources(parser Parser)
add_headers_and_sources(parser Parser/RelParsers)
add_headers_and_sources(rewriter Rewriter)
add_headers_and_sources(storages Storages)
+add_headers_and_sources(storages Storages/Kafka)
add_headers_and_sources(storages Storages/Output)
add_headers_and_sources(storages Storages/Serializations)
add_headers_and_sources(storages Storages/IO)
add_headers_and_sources(storages Storages/MergeTree)
+add_headers_and_sources(storages Storages/Kafka)
add_headers_and_sources(storages Storages/Cache)
add_headers_and_sources(common Common)
add_headers_and_sources(external External)
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 37d1a1bbd7..5d7a5c803b 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -43,6 +43,7 @@
#include <DataTypes/NestedUtils.h>
#include <Disks/registerDisks.h>
#include <Disks/registerGlutenDisks.h>
+#include <Formats/registerFormats.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/registerFunctions.h>
@@ -880,6 +881,8 @@ void registerGlutenDisks()
void BackendInitializerUtil::registerAllFactories()
{
+ registerFormats();
+
registerGlutenDisks();
registerReadBufferBuilders();
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
index 2e4dd4af1b..f65584eaea 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
@@ -22,6 +22,7 @@
#include <Interpreters/Context.h>
#include <Operator/BlocksBufferPoolTransform.h>
#include <Parser/RelParsers/MergeTreeRelParser.h>
+#include <Parser/RelParsers/StreamKafkaRelParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Parser/TypeParser.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
@@ -46,12 +47,12 @@ extern const int LOGICAL_ERROR;
namespace local_engine
{
using namespace DB;
-DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const
substrait::Rel & rel, std::list<const substrait::Rel *> &)
+DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack)
{
if (query_plan)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's
input plan should be null");
const auto & read = rel.read();
- if (read.has_local_files() || (!read.has_extension_table() &&
!isReadFromMergeTree(read)))
+ if (isReadFromDefault(read))
{
assert(read.has_base_schema());
DB::QueryPlanStepPtr read_step;
@@ -70,6 +71,13 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr
query_plan, const substra
query_plan->addStep(std::move(buffer_step));
}
}
+ else if (isReadFromStreamKafka(read))
+ {
+ StreamKafkaRelParser kafka_parser(parser_context, getContext());
+ kafka_parser.setSplitInfo(split_info);
+ query_plan = kafka_parser.parse(std::make_unique<DB::QueryPlan>(),
rel, rel_stack);
+ steps = kafka_parser.getSteps();
+ }
else
{
substrait::ReadRel::ExtensionTable extension_table;
@@ -87,6 +95,11 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr
query_plan, const substra
return query_plan;
}
+bool ReadRelParser::isReadFromDefault(const substrait::ReadRel & read)
+{
+ return read.has_local_files() || (!read.has_extension_table() &&
!isReadFromMergeTree(read) && !isReadFromStreamKafka(read));
+}
+
bool ReadRelParser::isReadRelFromJava(const substrait::ReadRel & rel)
{
return rel.has_local_files() && rel.local_files().items().size() == 1
@@ -95,7 +108,9 @@ bool ReadRelParser::isReadRelFromJava(const
substrait::ReadRel & rel)
bool ReadRelParser::isReadFromMergeTree(const substrait::ReadRel & rel)
{
- assert(rel.has_advanced_extension());
+ if (!rel.has_advanced_extension())
+ return false;
+
bool is_read_from_merge_tree;
google::protobuf::StringValue optimization;
optimization.ParseFromString(rel.advanced_extension().optimization().value());
@@ -107,6 +122,12 @@ bool ReadRelParser::isReadFromMergeTree(const
substrait::ReadRel & rel)
return is_read_from_merge_tree;
}
+
+bool ReadRelParser::isReadFromStreamKafka(const substrait::ReadRel & rel)
+{
+ return rel.has_stream_kafka() && rel.stream_kafka();
+}
+
DB::QueryPlanStepPtr ReadRelParser::parseReadRelWithJavaIter(const
substrait::ReadRel & rel)
{
GET_JNIENV(env)
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h
b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h
index 7f84a89ed4..a6ad920947 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h
@@ -34,12 +34,14 @@ public:
return parse(std::move(query_plan), rel, rel_stack_);
}
- DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel &
rel, std::list<const substrait::Rel *> &) override;
+ DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel &
rel, std::list<const substrait::Rel *> & rel_stack) override;
// This is source node, there is no input
std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel
& rel) override { return {}; }
+ bool isReadFromDefault(const substrait::ReadRel & rel);
bool isReadRelFromJava(const substrait::ReadRel & rel);
bool isReadFromMergeTree(const substrait::ReadRel & rel);
+ bool isReadFromStreamKafka(const substrait::ReadRel & rel);
void setInputIter(jobject input_iter_, bool is_materialze)
{
diff --git a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp
new file mode 100644
index 0000000000..0771b481d7
--- /dev/null
+++ b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StreamKafkaRelParser.h"
+
+#include <Parser/SubstraitParserUtils.h>
+#include <Parser/TypeParser.h>
+#include <Storages/Kafka/ReadFromGlutenStorageKafka.h>
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+extern const int NO_SUCH_DATA_PART;
+extern const int LOGICAL_ERROR;
+extern const int UNKNOWN_FUNCTION;
+extern const int UNKNOWN_TYPE;
+}
+}
+
+
+namespace local_engine
+{
+
+DB::QueryPlanPtr
+StreamKafkaRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel
& rel, std::list<const substrait::Rel *> & rel_stack_)
+{
+ if (rel.has_read())
+ return parseRelImpl(std::move(query_plan), rel.read());
+
+ throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser
can't parse rel:{}", rel.ShortDebugString());
+}
+
+DB::QueryPlanPtr StreamKafkaRelParser::parseRelImpl(DB::QueryPlanPtr
query_plan, const substrait::ReadRel & read_rel)
+{
+ if (!read_rel.has_stream_kafka())
+ throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can't not parse
kafka rel, because of read rel don't contained stream kafka");
+
+ auto kafka_task =
BinaryToMessage<substrait::ReadRel::StreamKafka>(split_info);
+ auto topic = kafka_task.topic_partition().topic();
+ auto partition = kafka_task.topic_partition().partition();
+ auto start_offset = kafka_task.start_offset();
+ auto end_offset = kafka_task.end_offset();
+ auto poll_timeout_ms = kafka_task.poll_timeout_ms();
+ String group_id;
+ String brokers;
+
+ for (auto param : kafka_task.params())
+ if (param.first == "poll_timeout_ms")
+ poll_timeout_ms = std::stoi(param.second);
+ else if (param.first == "group.id")
+ group_id = param.second;
+ else if (param.first == "bootstrap.servers")
+ brokers = param.second;
+ else
+ LOG_DEBUG(getLogger("StreamKafkaRelParser"), "Unused kafka
parameter: {}: {}", param.first, param.second);
+
+ LOG_INFO(
+ getLogger("StreamKafkaRelParser"),
+ "Kafka source: topic: {}, partition: {}, start_offset: {}, end_offset:
{}",
+ topic,
+ partition,
+ start_offset,
+ end_offset);
+
+ Names topics;
+ topics.emplace_back(topic);
+
+ auto header =
TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
+ Names names = header.getNames();
+ auto source = std::make_unique<ReadFromGlutenStorageKafka>(
+ names, header, getContext(), topics, partition, start_offset,
end_offset, poll_timeout_ms, group_id, brokers);
+
+ steps.emplace_back(source.get());
+ query_plan->addStep(std::move(source));
+
+ return query_plan;
+}
+
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h
b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h
new file mode 100644
index 0000000000..1fd12dea00
--- /dev/null
+++ b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <Parser/RelParsers/RelParser.h>
+
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int LOGICAL_ERROR;
+}
+}
+
+namespace local_engine
+{
+class StreamKafkaRelParser : public RelParser
+{
+public:
+ explicit StreamKafkaRelParser(ParserContextPtr parser_context_, const
DB::ContextPtr & context_)
+ : RelParser(parser_context_), context(context_)
+ {
+ }
+
+ ~StreamKafkaRelParser() override = default;
+
+ DB::QueryPlanPtr
+ parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel,
std::list<const substrait::Rel *> & rel_stack_) override;
+
+ std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel
&) override
+ {
+ throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
"StreamKafkaRelParser can't call getSingleInput().");
+ }
+
+ void setSplitInfo(String split_info_) { split_info = split_info_; }
+
+private:
+ DB::QueryPlanPtr parseRelImpl(DB::QueryPlanPtr query_plan, const
substrait::ReadRel & read_rel);
+
+ DB::ContextPtr context;
+
+ String split_info;
+};
+}
diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp
b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp
new file mode 100644
index 0000000000..471899d933
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GlutenKafkaSource.h"
+
+#include <Core/Settings.h>
+#include <DataTypes/DataTypeNullable.h>
+#include <Formats/FormatFactory.h>
+#include <Processors/Executors/StreamingFormatExecutor.h>
+#include <Storages/Kafka/GlutenKafkaUtils.h>
+#include <Storages/Kafka/KafkaConfigLoader.h>
+#include <Storages/Kafka/KafkaSettings.h>
+#include <Storages/Kafka/parseSyslogLevel.h>
+#include <boost/algorithm/string/replace.hpp>
+#include <Common/NamedCollections/NamedCollectionsFactory.h>
+
+
+namespace ProfileEvents
+{
+extern const Event KafkaMessagesRead;
+extern const Event KafkaMessagesFailed;
+extern const Event KafkaRowsRead;
+extern const Event KafkaRowsRejected;
+}
+
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int LOGICAL_ERROR;
+}
+
+namespace Setting
+{
+extern const SettingsUInt64 max_block_size;
+extern const SettingsUInt64 max_insert_block_size;
+extern const SettingsUInt64 output_format_avro_rows_in_file;
+extern const SettingsMilliseconds stream_flush_interval_ms;
+extern const SettingsMilliseconds stream_poll_timeout_ms;
+extern const SettingsBool use_concurrency_control;
+}
+
+namespace KafkaSetting
+{
+extern const KafkaSettingsUInt64 input_format_allow_errors_num;
+extern const KafkaSettingsFloat input_format_allow_errors_ratio;
+extern const KafkaSettingsString kafka_broker_list;
+extern const KafkaSettingsString kafka_client_id;
+extern const KafkaSettingsBool kafka_commit_every_batch;
+extern const KafkaSettingsBool kafka_commit_on_select;
+extern const KafkaSettingsUInt64 kafka_consumers_pool_ttl_ms;
+extern const KafkaSettingsMilliseconds kafka_flush_interval_ms;
+extern const KafkaSettingsString kafka_format;
+extern const KafkaSettingsString kafka_group_name;
+extern const KafkaSettingsStreamingHandleErrorMode kafka_handle_error_mode;
+extern const KafkaSettingsUInt64 kafka_max_block_size;
+extern const KafkaSettingsUInt64 kafka_max_rows_per_message;
+extern const KafkaSettingsUInt64 kafka_num_consumers;
+extern const KafkaSettingsUInt64 kafka_poll_max_batch_size;
+extern const KafkaSettingsMilliseconds kafka_poll_timeout_ms;
+extern const KafkaSettingsString kafka_schema;
+extern const KafkaSettingsBool kafka_thread_per_consumer;
+extern const KafkaSettingsString kafka_topic_list;
+}
+}
+
+
+namespace local_engine
+{
+
+size_t GlutenKafkaSource::getPollMaxBatchSize() const
+{
+ size_t batch_size =
(*kafka_settings)[KafkaSetting::kafka_poll_max_batch_size].changed
+ ? (*kafka_settings)[KafkaSetting::kafka_poll_max_batch_size].value
+ : context->getSettingsRef()[Setting::max_block_size].value;
+
+ return std::min(batch_size, getMaxBlockSize());
+}
+
+size_t GlutenKafkaSource::getMaxBlockSize() const
+{
+ return (*kafka_settings)[KafkaSetting::kafka_max_block_size].changed
+ ? (*kafka_settings)[KafkaSetting::kafka_max_block_size].value
+ : (context->getSettingsRef()[Setting::max_insert_block_size].value /
/*num_consumers*/ 1);
+}
+
+size_t GlutenKafkaSource::getPollTimeoutMillisecond() const
+{
+ return (*kafka_settings)[KafkaSetting::kafka_poll_timeout_ms].changed
+ ?
(*kafka_settings)[KafkaSetting::kafka_poll_timeout_ms].totalMilliseconds()
+ :
context->getSettingsRef()[Setting::stream_poll_timeout_ms].totalMilliseconds();
+}
+
+GlutenKafkaSource::GlutenKafkaSource(
+ const Block & result_header_,
+ const ContextPtr & context_,
+ const Names & topics_,
+ const size_t & partition_,
+ const String & brokers_,
+ const String & group_,
+ const size_t & poll_timeout_ms_,
+ const size_t & start_offset_,
+ const size_t & end_offset_,
+ const std::shared_ptr<KafkaSettings> & kafka_settings_)
+ : ISource(result_header_)
+ , context(context_)
+ , log(getLogger("GlutenKafkaSource"))
+ , kafka_settings(kafka_settings_)
+ , result_header(result_header_)
+ , topics(topics_)
+ , brokers(brokers_)
+ , group(group_)
+ , poll_timeout_ms(poll_timeout_ms_)
+ , start_offset(start_offset_)
+ , end_offset(end_offset_)
+ , partition(partition_)
+{
+ max_block_size = end_offset - start_offset;
+ client_id = topics[0] + "_" + std::to_string(partition);
+
+ for (const auto & columns_with_type_and_name :
result_header.getColumnsWithTypeAndName())
+ {
+ if (columns_with_type_and_name.name == "value")
+ {
+ const auto no_null_datatype =
removeNullable(columns_with_type_and_name.type);
+
non_virtual_header.insert(ColumnWithTypeAndName(no_null_datatype->createColumn(),
no_null_datatype, "value"));
+ continue;
+ }
+
+ virtual_header.insert(columns_with_type_and_name);
+ }
+}
+
+GlutenKafkaSource::~GlutenKafkaSource()
+{
+ std::lock_guard lock(consumer_mutex);
+ auto topic_partition = TopicPartition{topics[0], partition};
+ consumers_in_memory[topic_partition].emplace_back(consumer);
+ LOG_DEBUG(
+ log,
+ "Kafka consumer for topic: {}, partition: {} is returned to pool,
current pool size: {}",
+ topics[0],
+ partition,
+ consumers_in_memory[topic_partition].size());
+}
+
+void GlutenKafkaSource::initConsumer()
+{
+ std::lock_guard lock(consumer_mutex);
+ auto topic_partition = TopicPartition{topics[0], partition};
+ consumers_in_memory.try_emplace(topic_partition,
std::vector<std::shared_ptr<DB::KafkaConsumer>>());
+
+ auto & consumers = consumers_in_memory[topic_partition];
+
+ if (!consumers.empty())
+ {
+ LOG_DEBUG(log, "Reuse Kafka consumer for topic: {}, partition: {}",
topics[0], partition);
+ consumer = consumers.back();
+ consumers.pop_back();
+ }
+
+ if (!consumer)
+ {
+ LOG_DEBUG(log, "Creating new Kafka consumer for topic: {}, partition:
{}", topics[0], partition);
+ String collection_name = "";
+ std::shared_ptr<DB::KafkaConsumer> kafka_consumer_ptr =
std::make_shared<KafkaConsumer>(
+ log,
+ getPollMaxBatchSize(),
+ getPollTimeoutMillisecond(),
+ /*intermediate_commit*/ false,
+ /*stream_cancelled*/ is_stopped,
+ topics);
+
+ KafkaConfigLoader::ConsumerConfigParams params{
+ {context->getConfigRef(), /*collection_name*/ collection_name,
topics, log},
+ brokers,
+ group,
+ false,
+ 1,
+ client_id,
+ getMaxBlockSize()};
+
+
kafka_consumer_ptr->createConsumer(GlutenKafkaUtils::getConsumerConfiguration(params));
+ consumer = kafka_consumer_ptr;
+ LOG_DEBUG(log, "Created new Kafka consumer for topic: {}, partition:
{}", topics[0], partition);
+ }
+
+ consumer->subscribe(topics[0], partition, start_offset);
+}
+
+
+Chunk GlutenKafkaSource::generateImpl()
+{
+ if (!consumer)
+ initConsumer();
+
+ size_t total_rows = 0;
+ MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
+ MutableColumns no_virtual_columns = non_virtual_header.cloneEmptyColumns();
+
+ while (true)
+ {
+ if (auto buf = consumer->consume())
+ {
+ String message;
+ readStringUntilEOF(message, *buf);
+ no_virtual_columns[0]->insert(message);
+
+ // In read_kafka_message(), KafkaConsumer::nextImpl()
+ // will be called, that may make something unusable, i.e. clean
+ // KafkaConsumer::messages, which is accessed from
+ // KafkaConsumer::currentTopic() (and other helpers).
+ if (consumer->isStalled())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Polled messages
became unusable");
+
+ ProfileEvents::increment(ProfileEvents::KafkaRowsRead, 1);
+ consumer->storeLastReadMessageOffset();
+
+ auto topic = consumer->currentTopic();
+ auto key = consumer->currentKey();
+ auto offset = consumer->currentOffset();
+ auto partition = consumer->currentPartition();
+ auto timestamp_raw = consumer->currentTimestamp();
+ auto header_list = consumer->currentHeaderList();
+
+ virtual_columns[0]->insert(key);
+ virtual_columns[1]->insert(topic);
+ virtual_columns[2]->insert(partition);
+ virtual_columns[3]->insert(offset);
+
+ if (timestamp_raw)
+ {
+ auto ts = timestamp_raw->get_timestamp();
+ virtual_columns[4]->insert(
+
DecimalField<Decimal64>(std::chrono::duration_cast<std::chrono::milliseconds>(ts).count(),
3));
+ }
+ else
+ {
+ virtual_columns[4]->insertDefault();
+ }
+
+ virtual_columns[5]->insertDefault();
+
+ total_rows = total_rows + 1;
+ }
+ else if (consumer->polledDataUnusable())
+ {
+ break;
+ }
+ else
+ {
+ // We came here in case of tombstone (or sometimes zero-length)
messages, and it is not something abnormal
+ // TODO: it seems like in case of put_error_to_stream=true we may
need to process those differently
+ // currently we just skip them with note in logs.
+ consumer->storeLastReadMessageOffset();
+ LOG_DEBUG(
+ log,
+ "Parsing of message (topic: {}, partition: {}, offset: {})
return no rows.",
+ consumer->currentTopic(),
+ consumer->currentPartition(),
+ consumer->currentOffset());
+ }
+
+ if (!consumer->hasMorePolledMessages() || total_rows >= max_block_size)
+ break;
+ }
+
+ LOG_DEBUG(log, "Read {} rows from Kafka topic: {}, partition: {}",
total_rows, topics[0], partition);
+
+ if (total_rows == 0)
+ return {};
+
+ if (consumer->polledDataUnusable())
+ {
+ // the rows were counted already before by KafkaRowsRead,
+ // so let's count the rows we ignore separately
+ // (they will be retried after the rebalance)
+ ProfileEvents::increment(ProfileEvents::KafkaRowsRejected, total_rows);
+ return {};
+ }
+
+ auto result_block =
non_virtual_header.cloneWithColumns(std::move(no_virtual_columns));
//.cloneWithCutColumns(0, max_block_size);
+
+ auto virtual_block =
virtual_header.cloneWithColumns(std::move(virtual_columns));
//.cloneWithCutColumns(0, max_block_size);
+ for (const auto & column : virtual_block.getColumnsWithTypeAndName())
+ result_block.insert(column);
+
+ progress(total_rows, result_block.bytes());
+
+ auto converting_dag = ActionsDAG::makeConvertingActions(
+ result_block.cloneEmpty().getColumnsWithTypeAndName(),
+ getPort().getHeader().getColumnsWithTypeAndName(),
+ ActionsDAG::MatchColumnsMode::Name);
+
+ auto converting_actions =
std::make_shared<ExpressionActions>(std::move(converting_dag));
+ converting_actions->execute(result_block);
+
+ return Chunk(result_block.getColumns(), result_block.rows());
+}
+
+Chunk GlutenKafkaSource::generate()
+{
+ if (isCancelled() || finished)
+ return {};
+
+ auto chunk = generateImpl();
+ finished = true;
+
+ return chunk;
+}
+}
diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h
b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h
new file mode 100644
index 0000000000..c1aa541579
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+
+#include <Processors/ISource.h>
+#include <Storages/Kafka/KafkaConsumer.h>
+#include <Storages/Kafka/KafkaSettings.h>
+
+namespace local_engine
+{
+
+class GlutenKafkaSource : public DB::ISource
+{
+public:
+ GlutenKafkaSource(
+ const DB::Block & result_header_,
+ const DB::ContextPtr & context_,
+ const DB::Names & topics_,
+ const size_t & partition_,
+ const String & brokers_,
+ const String & group_,
+ const size_t & poll_timeout_ms_,
+ const size_t & start_offset_,
+ const size_t & end_offset_,
+ const std::shared_ptr<DB::KafkaSettings> & kafka_settings_);
+
+ ~GlutenKafkaSource() override;
+
+ struct TopicPartition
+ {
+ String topic;
+ size_t partition;
+
+ bool operator==(const TopicPartition & other) const { return topic ==
other.topic && partition == other.partition; }
+
+ TopicPartition(const String & topic, size_t partition) : topic(topic),
partition(partition) { }
+ };
+
+ String getName() const override { return "GlutenKafkaSource"; }
+
+protected:
+ DB::Chunk generate() override;
+
+private:
+ DB::Chunk generateImpl();
+ void initConsumer();
+
+ size_t getPollMaxBatchSize() const;
+ size_t getMaxBlockSize() const;
+ size_t getPollTimeoutMillisecond() const;
+
+ LoggerPtr log;
+ DB::ContextPtr context;
+ UInt64 max_block_size;
+ std::shared_ptr<DB::KafkaConsumer> consumer;
+
+ DB::Block result_header;
+ DB::Block virtual_header;
+ DB::Block non_virtual_header;
+ std::shared_ptr<DB::KafkaSettings> kafka_settings;
+
+ const DB::Names topics;
+ const size_t partition;
+ const String brokers;
+ const String group;
+ const size_t poll_timeout_ms;
+ const size_t start_offset;
+ const size_t end_offset;
+ String client_id;
+ bool finished = false;
+};
+
+}
+
+namespace std
+{
+template <>
+struct hash<local_engine::GlutenKafkaSource::TopicPartition>
+{
+ std::size_t operator()(const
local_engine::GlutenKafkaSource::TopicPartition & tp) const noexcept
+ {
+ std::size_t h1 = std::hash<std::string>{}(tp.topic);
+ std::size_t h2 = std::hash<size_t>{}(tp.partition);
+ return h1 ^ (h2 << 1); // Combine the two hash values
+ }
+};
+}
+
+namespace local_engine
+{
+static std::mutex consumer_mutex;
+static std::unordered_map<GlutenKafkaSource::TopicPartition,
std::vector<std::shared_ptr<DB::KafkaConsumer>>> consumers_in_memory;
+static const std::atomic<bool> is_stopped{false}; // for kafka progress, it
always false
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp
b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp
new file mode 100644
index 0000000000..c5a6838523
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GlutenKafkaUtils.h"
+
+
+#include <Storages/Kafka/parseSyslogLevel.h>
+#include <boost/algorithm/string/replace.hpp>
+#include <Common/NamedCollections/NamedCollectionsFactory.h>
+#include <Common/config_version.h>
+
+namespace local_engine
+{
+using namespace DB;
+void GlutenKafkaUtils::setKafkaConfigValue(cppkafka::Configuration &
kafka_config, const String & key, const String & value)
+{
+ /// "log_level" has valid underscore, the remaining librdkafka setting use
dot.separated.format which isn't acceptable for XML.
+ /// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
+ const String setting_name_in_kafka_config = (key == "log_level") ? key :
boost::replace_all_copy(key, "_", ".");
+ kafka_config.set(setting_name_in_kafka_config, value);
+}
+
+void GlutenKafkaUtils::loadNamedCollectionConfig(
+ cppkafka::Configuration & kafka_config, const String & collection_name,
const String & config_prefix)
+{
+ const auto & collection =
DB::NamedCollectionFactory::instance().get(collection_name);
+ for (const auto & key : collection->getKeys(-1, config_prefix))
+ {
+ // Cut prefix with '.' before actual config tag.
+ const auto param_name = key.substr(config_prefix.size() + 1);
+ setKafkaConfigValue(kafka_config, param_name,
collection->get<String>(key));
+ }
+}
+
+void GlutenKafkaUtils::loadConfigProperty(
+ cppkafka::Configuration & kafka_config,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix,
+ const String & tag)
+{
+ const String property_path = config_prefix + "." + tag;
+ const String property_value = config.getString(property_path);
+
+ setKafkaConfigValue(kafka_config, tag, property_value);
+}
+
+
+void GlutenKafkaUtils::loadTopicConfig(
+ cppkafka::Configuration & kafka_config,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & collection_name,
+ const String & config_prefix,
+ const String & topic)
+{
+ if (!collection_name.empty())
+ {
+ const auto topic_prefix = fmt::format("{}.{}", config_prefix,
KafkaConfigLoader::CONFIG_KAFKA_TOPIC_TAG);
+ const auto & collection =
NamedCollectionFactory::instance().get(collection_name);
+ for (const auto & key : collection->getKeys(1, config_prefix))
+ {
+ /// Only consider key <kafka_topic>. Multiple occurrences given as
"kafka_topic", "kafka_topic[1]", etc.
+ if (!key.starts_with(topic_prefix))
+ continue;
+
+ const String kafka_topic_path = config_prefix + "." + key;
+ const String kafka_topic_name_path = kafka_topic_path + "." +
KafkaConfigLoader::CONFIG_NAME_TAG;
+ if (topic == collection->get<String>(kafka_topic_name_path))
+ /// Found it! Now read the per-topic configuration into
cppkafka.
+ loadNamedCollectionConfig(kafka_config, collection_name,
kafka_topic_path);
+ }
+ }
+ else
+ {
+ /// Read all tags one level below <kafka>
+ Poco::Util::AbstractConfiguration::Keys tags;
+ config.keys(config_prefix, tags);
+
+ for (const auto & tag : tags)
+ {
+ if (tag == KafkaConfigLoader::CONFIG_NAME_TAG)
+ continue; // ignore <name>, it is used to match topic
configurations
+ loadConfigProperty(kafka_config, config, config_prefix, tag);
+ }
+ }
+}
+
+
+void GlutenKafkaUtils::loadFromConfig(
+ cppkafka::Configuration & kafka_config, const
KafkaConfigLoader::LoadConfigParams & params, const String & config_prefix)
+{
+ if (!params.collection_name.empty())
+ {
+ loadNamedCollectionConfig(kafka_config, params.collection_name,
config_prefix);
+ return;
+ }
+
+ /// Read all tags one level below <kafka>
+ Poco::Util::AbstractConfiguration::Keys tags;
+ params.config.keys(config_prefix, tags);
+
+ for (const auto & tag : tags)
+ {
+ if (tag == KafkaConfigLoader::CONFIG_KAFKA_PRODUCER_TAG || tag ==
KafkaConfigLoader::CONFIG_KAFKA_CONSUMER_TAG)
+ /// Do not load consumer/producer properties, since they should be
separated by different configuration objects.
+ continue;
+
+ if (tag.starts_with(
+ KafkaConfigLoader::CONFIG_KAFKA_TOPIC_TAG)) /// multiple
occurrences given as "kafka_topic", "kafka_topic[1]", etc.
+ {
+ // Update consumer topic-specific configuration (new syntax).
Example with topics "football" and "baseball":
+ // <kafka>
+ // <kafka_topic>
+ // <name>football</name>
+ // <retry_backoff_ms>250</retry_backoff_ms>
+ // <fetch_min_bytes>5000</fetch_min_bytes>
+ // </kafka_topic>
+ // <kafka_topic>
+ // <name>baseball</name>
+ // <retry_backoff_ms>300</retry_backoff_ms>
+ // <fetch_min_bytes>2000</fetch_min_bytes>
+ // </kafka_topic>
+ // </kafka>
+ // Advantages: The period restriction no longer applies (e.g.
<name>sports.football</name> will work), everything
+ // Kafka-related is below <kafka>.
+ for (const auto & topic : params.topics)
+ {
+ /// Read topic name between <name>...</name>
+ const String kafka_topic_path = config_prefix + "." + tag;
+ const String kafka_topic_name_path = kafka_topic_path + "." +
KafkaConfigLoader::CONFIG_NAME_TAG;
+ const String topic_name =
params.config.getString(kafka_topic_name_path);
+
+ if (topic_name != topic)
+ continue;
+ loadTopicConfig(kafka_config, params.config,
params.collection_name, kafka_topic_path, topic);
+ }
+ continue;
+ }
+ if (tag.starts_with(KafkaConfigLoader::CONFIG_KAFKA_TAG))
+ /// skip legacy configuration per topic e.g. <kafka_TOPIC_NAME>.
+ /// it will be processed is a separate function
+ continue;
+ // Update configuration from the configuration. Example:
+ // <kafka>
+ // <retry_backoff_ms>250</retry_backoff_ms>
+ // <fetch_min_bytes>100000</fetch_min_bytes>
+ // </kafka>
+ loadConfigProperty(kafka_config, params.config, config_prefix, tag);
+ }
+}
+
+
+void GlutenKafkaUtils::updateGlobalConfiguration(cppkafka::Configuration &
kafka_config, const KafkaConfigLoader::LoadConfigParams & params)
+{
+ loadFromConfig(kafka_config, params, KafkaConfigLoader::CONFIG_KAFKA_TAG);
+
+#if USE_KRB5
+ if (kafka_config.has_property("sasl.kerberos.kinit.cmd"))
+ LOG_WARNING(params.log, "sasl.kerberos.kinit.cmd configuration
parameter is ignored.");
+
+ kafka_config.set("sasl.kerberos.kinit.cmd", "");
+ kafka_config.set("sasl.kerberos.min.time.before.relogin", "0");
+
+ if (kafka_config.has_property("sasl.kerberos.keytab") &&
kafka_config.has_property("sasl.kerberos.principal"))
+ {
+ String keytab = kafka_config.get("sasl.kerberos.keytab");
+ String principal = kafka_config.get("sasl.kerberos.principal");
+ LOG_DEBUG(params.log, "Running KerberosInit");
+ try
+ {
+ kerberosInit(keytab, principal);
+ }
+ catch (const Exception & e)
+ {
+ LOG_ERROR(params.log, "KerberosInit failure: {}",
getExceptionMessage(e, false));
+ }
+ LOG_DEBUG(params.log, "Finished KerberosInit");
+ }
+#else // USE_KRB5
+ if (kafka_config.has_property("sasl.kerberos.keytab") ||
kafka_config.has_property("sasl.kerberos.principal"))
+ LOG_WARNING(params.log, "Ignoring Kerberos-related parameters because
ClickHouse was built without krb5 library support.");
+#endif // USE_KRB5
+ // No need to add any prefix, messages can be distinguished
+ kafka_config.set_log_callback(
+ [log = params.log](cppkafka::KafkaHandleBase & handle, int level,
const std::string & facility, const std::string & message)
+ {
+ auto [poco_level, client_logs_level] = parseSyslogLevel(level);
+ const auto & kafka_object_config = handle.get_configuration();
+ const std::string client_id_key{"client.id"};
+ chassert(kafka_object_config.has_property(client_id_key) && "Kafka
configuration doesn't have expected client.id set");
+ LOG_IMPL(
+ log,
+ client_logs_level,
+ poco_level,
+ "[client.id:{}] [rdk:{}] {}",
+ kafka_object_config.get(client_id_key),
+ facility,
+ message);
+ });
+
+ /// NOTE: statistics should be consumed, otherwise it creates too much
+ /// entries in the queue, that leads to memory leak and slow shutdown.
+ if (!kafka_config.has_property("statistics.interval.ms"))
+ {
+ // every 3 seconds by default. set to 0 to disable.
+ kafka_config.set("statistics.interval.ms", "3000");
+ }
+}
+
+
+void GlutenKafkaUtils::loadLegacyTopicConfig(
+ cppkafka::Configuration & kafka_config,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & collection_name,
+ const String & config_prefix)
+{
+ if (!collection_name.empty())
+ {
+ loadNamedCollectionConfig(kafka_config, collection_name,
config_prefix);
+ return;
+ }
+
+ Poco::Util::AbstractConfiguration::Keys tags;
+ config.keys(config_prefix, tags);
+
+ for (const auto & tag : tags)
+ loadConfigProperty(kafka_config, config, config_prefix, tag);
+}
+
+void GlutenKafkaUtils::loadLegacyConfigSyntax(
+ cppkafka::Configuration & kafka_config,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & collection_name,
+ const Names & topics)
+{
+ for (const auto & topic : topics)
+ {
+ const String kafka_topic_path = KafkaConfigLoader::CONFIG_KAFKA_TAG +
"." + KafkaConfigLoader::CONFIG_KAFKA_TAG + "_" + topic;
+ loadLegacyTopicConfig(kafka_config, config, collection_name,
kafka_topic_path);
+ }
+}
+
+
+void GlutenKafkaUtils::loadConsumerConfig(cppkafka::Configuration &
kafka_config, const KafkaConfigLoader::LoadConfigParams & params)
+{
+ const String consumer_path = KafkaConfigLoader::CONFIG_KAFKA_TAG + "." +
KafkaConfigLoader::CONFIG_KAFKA_CONSUMER_TAG;
+ loadLegacyConfigSyntax(kafka_config, params.config,
params.collection_name, params.topics);
+ // A new syntax has higher priority
+ loadFromConfig(kafka_config, params, consumer_path);
+}
+
+
+cppkafka::Configuration GlutenKafkaUtils::getConsumerConfiguration(const
KafkaConfigLoader::ConsumerConfigParams & params)
+{
+ cppkafka::Configuration conf;
+
+ conf.set("metadata.broker.list", params.brokers);
+ conf.set("group.id", params.group);
+ if (params.multiple_consumers)
+ conf.set("client.id", fmt::format("{}-{}", params.client_id,
params.consumer_number));
+ else
+ conf.set("client.id", params.client_id);
+ conf.set("client.software.name", VERSION_NAME);
+ conf.set("client.software.version", VERSION_DESCRIBE);
+ conf.set("auto.offset.reset", "earliest"); // If no offset stored for this
group, read all messages from the start
+
+ // that allows to prevent fast draining of the librdkafka queue
+ // during building of single insert block. Improves performance
+ // significantly, but may lead to bigger memory consumption.
+ size_t default_queued_min_messages = 100000; // must be greater than or
equal to default
+ size_t max_allowed_queued_min_messages = 10000000; // must be less than or
equal to max allowed value
+ conf.set(
+ "queued.min.messages", std::min(std::max(params.max_block_size,
default_queued_min_messages), max_allowed_queued_min_messages));
+
+ updateGlobalConfiguration(conf, params);
+ loadConsumerConfig(conf, params);
+
+ // those settings should not be changed by users.
+ conf.set("enable.auto.commit", "false"); // We manually commit offsets
after a stream successfully finished
+ conf.set("enable.auto.offset.store", "false"); // Update offset
automatically - to commit them all at once.
+ conf.set("enable.partition.eof", "false"); // Ignore EOF messages
+
+ for (auto & property : conf.get_all())
+ LOG_TRACE(params.log, "Consumer set property {}:{}", property.first,
property.second);
+
+ return conf;
+}
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h
b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h
new file mode 100644
index 0000000000..517341c259
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <Access/KerberosInit.h>
+#include <Storages/Kafka/KafkaConfigLoader.h>
+#include <cppkafka/configuration.h>
+#include <Poco/Util/AbstractConfiguration.h>
+
+namespace local_engine
+{
+using namespace DB;
+
+class GlutenKafkaUtils
+{
+public:
+ static void setKafkaConfigValue(cppkafka::Configuration & kafka_config,
const String & key, const String & value);
+
+ static void
+ loadNamedCollectionConfig(cppkafka::Configuration & kafka_config, const
String & collection_name, const String & config_prefix);
+
+ static void loadConfigProperty(
+ cppkafka::Configuration & kafka_config,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix,
+ const String & tag);
+
+ static void loadTopicConfig(
+ cppkafka::Configuration & kafka_config,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & collection_name,
+ const String & config_prefix,
+ const String & topic);
+
+ static void loadFromConfig(
+ cppkafka::Configuration & kafka_config, const
KafkaConfigLoader::LoadConfigParams & params, const String & config_prefix);
+
+ static void updateGlobalConfiguration(cppkafka::Configuration &
kafka_config, const KafkaConfigLoader::LoadConfigParams & params);
+
+ static void loadLegacyTopicConfig(
+ cppkafka::Configuration & kafka_config,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & collection_name,
+ const String & config_prefix);
+
+ static void loadLegacyConfigSyntax(
+ cppkafka::Configuration & kafka_config,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & collection_name,
+ const Names & topics);
+
+ static void loadConsumerConfig(cppkafka::Configuration & kafka_config,
const KafkaConfigLoader::LoadConfigParams & params);
+
+
+ static cppkafka::Configuration getConsumerConfiguration(const
KafkaConfigLoader::ConsumerConfigParams & params);
+};
+
+
+}
diff --git a/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp
b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp
new file mode 100644
index 0000000000..7e304a1894
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "GlutenKafkaSource.h"
+
+#include <Core/Settings.h>
+#include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Storages/Kafka/ReadFromGlutenStorageKafka.h>
+
+
+namespace DB
+{
+namespace Setting
+{
+extern const SettingsBool stream_like_engine_allow_direct_select;
+}
+}
+
+
+namespace local_engine
+{
+using namespace DB;
+
+ReadFromGlutenStorageKafka::ReadFromGlutenStorageKafka(
+ const Names & column_names_,
+ Header output_header_,
+ // std::shared_ptr<const StorageLimitsList> storage_limits_,
+ ContextPtr context_,
+ Names & topics,
+ size_t partition,
+ size_t start_offset,
+ size_t end_offset,
+ size_t poll_timeout_ms,
+ String group_id,
+ String brokers)
+ : ISourceStep{output_header_}
+ , WithContext{context_}
+ ,
+ // storage_limits{std::move(storage_limits_)},
+ column_names(column_names_)
+ , output_header(output_header_)
+ , topics(topics)
+ , partition(partition)
+ , start_offset(start_offset)
+ , end_offset(end_offset)
+ , poll_timeout_ms(poll_timeout_ms)
+ , group_id(group_id)
+ , brokers(brokers)
+{
+}
+
+void ReadFromGlutenStorageKafka::initializePipeline(QueryPipelineBuilder &
pipeline, const BuildQueryPipelineSettings &)
+{
+ auto pipe = makePipe();
+
+ /// Add storage limits.
+ // for (const auto & processor : pipe.getProcessors())
+ // processor->setStorageLimits(storage_limits);
+
+ /// Add to processors to get processor info through explain pipeline
statement.
+ for (const auto & processor : pipe.getProcessors())
+ processors.emplace_back(processor);
+
+ pipeline.init(std::move(pipe));
+}
+
+Pipe ReadFromGlutenStorageKafka::makePipe()
+{
+ auto kafka_settings = createKafkaSettings();
+ Pipes pipes;
+ pipes.reserve(1);
+ auto modified_context = Context::createCopy(getContext());
+ //
modified_context->applySettingsChanges(kafka_storage.settings_adjustments);
+ pipes.emplace_back(std::make_shared<GlutenKafkaSource>(
+ output_header, modified_context, topics, partition, brokers, group_id,
poll_timeout_ms, start_offset, end_offset, kafka_settings));
+
+ // LOG_DEBUG(kafka_storage.log, "Starting reading kafka batch stream");
+ return Pipe::unitePipes(std::move(pipes));
+}
+
+std::shared_ptr<KafkaSettings>
ReadFromGlutenStorageKafka::createKafkaSettings()
+{
+ // TODO: add more configuration
+ return std::make_shared<DB::KafkaSettings>();
+}
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h
b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h
new file mode 100644
index 0000000000..820189a101
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <Processors/IProcessor.h>
+#include <Processors/QueryPlan/ISourceStep.h>
+#include <Storages/Kafka/KafkaSettings.h>
+
+namespace local_engine
+{
+using namespace DB;
+class ReadFromGlutenStorageKafka : public ISourceStep, protected WithContext
+{
+public:
+ ReadFromGlutenStorageKafka(
+ const Names & column_names_,
+ Header output_header_,
+ ContextPtr context_,
+ Names & topics,
+ size_t partition,
+ size_t start_offset,
+ size_t end_offset,
+ size_t poll_timeout_ms,
+ String group_id,
+ String brokers);
+
+ String getName() const override { return "ReadFromGlutenStorageKafka"; }
+
+ void initializePipeline(QueryPipelineBuilder & pipeline, const
BuildQueryPipelineSettings & /*settings*/) final;
+
+private:
+ Pipe makePipe();
+ std::shared_ptr<KafkaSettings> createKafkaSettings();
+
+protected:
+ // std::shared_ptr<const StorageLimitsList> storage_limits;
+ const Names & column_names;
+ Header output_header;
+
+ Names topics;
+ size_t partition;
+ size_t start_offset;
+ size_t end_offset;
+ size_t poll_timeout_ms;
+ String group_id;
+ String brokers;
+};
+
+
+}
diff --git a/docs/developers/SubstraitModifications.md
b/docs/developers/SubstraitModifications.md
index 3db2b5869c..8ca2ab4f10 100644
--- a/docs/developers/SubstraitModifications.md
+++ b/docs/developers/SubstraitModifications.md
@@ -29,6 +29,7 @@ changed `Unbounded` in `WindowFunction` into
`Unbounded_Preceding` and `Unbounde
* Added `TopNRel`
([#5409](https://github.com/apache/incubator-gluten/pull/5409)).
* Added `ref` field in window bound `Preceding` and `Following`
([#5626](https://github.com/apache/incubator-gluten/pull/5626)).
* Added `BucketSpec` field in
`WriteRel`([#8386](https://github.com/apache/incubator-gluten/pull/8386))
+* Added `StreamKafka` in
`ReadRel`([#8321](https://github.com/apache/incubator-gluten/pull/8321))
## Modifications to type.proto
diff --git a/gluten-kafka/pom.xml b/gluten-kafka/pom.xml
new file mode 100644
index 0000000000..ff633728a8
--- /dev/null
+++ b/gluten-kafka/pom.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>gluten-parent</artifactId>
+ <groupId>org.apache.gluten</groupId>
+ <version>1.4.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>gluten-kafka</artifactId>
+ <packaging>jar</packaging>
+ <name>Gluten Kafka</name>
+
+ <properties>
+ <resource.dir>${project.basedir}/src/main/resources</resource.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-substrait</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- For test -->
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-substrait</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <resources>
+ <resource>
+ <directory>${resource.dir}</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <junitxml>.</junitxml>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>prepare-test-jar</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/gluten-kafka/src-kafka/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java
b/gluten-kafka/src-kafka/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java
new file mode 100644
index 0000000000..c0cfe8866f
--- /dev/null
+++
b/gluten-kafka/src-kafka/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.substrait.rel;
+
+import java.util.Map;
+
+public class StreamKafkaSourceBuilder {
+ public static StreamKafkaSourceNode makeStreamKafkaBatch(
+ String topic,
+ Integer partition,
+ Long startOffset,
+ Long endOffset,
+ Long pollTimeoutMs,
+ Boolean failOnDataLoss,
+ Boolean includeHeaders,
+ Map<String, Object> kafkaParams) {
+ return new StreamKafkaSourceNode(
+ topic,
+ partition,
+ startOffset,
+ endOffset,
+ pollTimeoutMs,
+ failOnDataLoss,
+ includeHeaders,
+ kafkaParams);
+ }
+}
diff --git
a/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
new file mode 100644
index 0000000000..d3a581ae30
--- /dev/null
+++
b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.gluten.substrait.rel.{ReadRelNode, SplitInfo}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory, Scan}
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec
+import org.apache.spark.sql.kafka010.GlutenStreamKafkaSourceUtil
+import org.apache.spark.sql.types.StructType
+
+/** Physical plan node for scanning a micro-batch of data from a data source.
*/
+case class MicroBatchScanExecTransformer(
+ override val output: Seq[AttributeReference],
+ @transient override val scan: Scan,
+ @transient stream: MicroBatchStream,
+ @transient start: Offset,
+ @transient end: Offset,
+ @transient override val table: Table,
+ override val keyGroupedPartitioning: Option[Seq[Expression]] = None,
+ override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None
+) extends BatchScanExecTransformerBase(
+ output = output,
+ scan = scan,
+ runtimeFilters = Seq.empty,
+ table = table,
+ keyGroupedPartitioning = keyGroupedPartitioning,
+ commonPartitionValues = commonPartitionValues
+ ) {
+
+ // TODO: unify the equal/hashCode implementation for all data source v2
query plans.
+ override def equals(other: Any): Boolean = other match {
+ case other: MicroBatchScanExecTransformer => this.stream == other.stream
+ case _ => false
+ }
+
+ override lazy val readerFactory: PartitionReaderFactory =
stream.createReaderFactory()
+
+ @transient override lazy val inputPartitionsShim: Seq[InputPartition] =
+ stream.planInputPartitions(start, end)
+
+ override def filterExprs(): Seq[Expression] = Seq.empty
+
+ override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
+
+ override def outputAttributes(): Seq[Attribute] = output
+
+ override def getPartitions: Seq[InputPartition] = inputPartitionsShim
+
+ /** Returns the actual schema of this data source scan. */
+ override def getDataSchema: StructType = scan.readSchema()
+
+ override def nodeName: String =
s"MicroBatchScanExecTransformer(${scan.description()})"
+
+ override lazy val fileFormat: ReadFileFormat =
GlutenStreamKafkaSourceUtil.getFileFormat(scan)
+
+ protected[this] def supportsBatchScan(scan: Scan): Boolean = {
+ MicroBatchScanExecTransformer.supportsBatchScan(scan)
+ }
+
+ override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
+ val groupedPartitions = filteredPartitions.flatten
+ groupedPartitions.zipWithIndex.map {
+ case (p, _) => GlutenStreamKafkaSourceUtil.genSplitInfo(p)
+ }
+ }
+
+ override protected def doTransform(context: SubstraitContext):
TransformContext = {
+ val ctx = super.doTransform(context)
+ ctx.root.asInstanceOf[ReadRelNode].setStreamKafka(true);
+ ctx
+ }
+}
+
+object MicroBatchScanExecTransformer {
+ def apply(batch: MicroBatchScanExec): MicroBatchScanExecTransformer = {
+ val output =
batch.output.filter(_.isInstanceOf[AttributeReference]).map(_.asInstanceOf[AttributeReference]).toSeq
+ if (output.size == batch.output.size) {
+ new MicroBatchScanExecTransformer(
+ output,
+ batch.scan,
+ batch.stream,
+ batch.start,
+ batch.end,
+ null,
+ Option.empty,
+ Option.empty)
+ } else {
+ throw new UnsupportedOperationException(
+ s"Unsupported DataSourceV2ScanExecBase:
${batch.output.getClass.getName}")
+ }
+ }
+
+ def supportsBatchScan(scan: Scan): Boolean = {
+ scan.getClass.getName ==
"org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan"
+ }
+}
diff --git
a/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
new file mode 100644
index 0000000000..5d670e0723
--- /dev/null
+++
b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.extension.columnar.enumerated.RasOffload
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+import org.apache.gluten.extension.columnar.validator.Validators
+import org.apache.gluten.extension.injector.Injector
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
MicroBatchScanExec}
+
+case class OffloadKafkaScan() extends OffloadSingleNode {
+ override def offload(plan: SparkPlan): SparkPlan = plan match {
+ case scan: MicroBatchScanExec if
MicroBatchScanExecTransformer.supportsBatchScan(scan.scan) =>
+ MicroBatchScanExecTransformer(scan)
+ case other => other
+ }
+}
+
+object OffloadKafkaScan {
+ def inject(injector: Injector): Unit = {
+ // Inject legacy rule.
+ injector.gluten.legacy.injectTransform {
+ c =>
+ val offload = Seq(OffloadKafkaScan())
+ HeuristicTransform.Simple(
+ Validators.newValidator(c.glutenConf, offload),
+ offload
+ )
+ }
+
+ // Inject RAS rule.
+ injector.gluten.ras.injectRasRule {
+ c =>
+ RasOffload.Rule(
+ RasOffload.from[BatchScanExec](OffloadKafkaScan()),
+ Validators.newValidator(c.glutenConf),
+ Nil)
+ }
+ }
+}
diff --git
a/gluten-kafka/src-kafka/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala
b/gluten-kafka/src-kafka/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala
new file mode 100644
index 0000000000..d5468f2cde
--- /dev/null
+++
b/gluten-kafka/src-kafka/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.kafka010
+
+import org.apache.gluten.exception.GlutenNotSupportException
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.gluten.substrait.rel.{SplitInfo, StreamKafkaSourceBuilder}
+import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+
+object GlutenStreamKafkaSourceUtil {
+ def genSplitInfo(
+ inputPartition: InputPartition): SplitInfo = inputPartition match {
+ case batch: KafkaBatchInputPartition =>
+ StreamKafkaSourceBuilder.makeStreamKafkaBatch(
+ batch.offsetRange.topicPartition.topic(),
+ batch.offsetRange.topicPartition.partition(),
+ batch.offsetRange.fromOffset,
+ batch.offsetRange.untilOffset,
+ batch.pollTimeoutMs,
+ batch.failOnDataLoss,
+ batch.includeHeaders,
+ batch.executorKafkaParams
+ )
+ case _ =>
+ throw new UnsupportedOperationException("Only support kafka
KafkaBatchInputPartition.")
+ }
+
+ def getFileFormat(scan: Scan): ReadFileFormat = scan.getClass.getSimpleName
match {
+ case "KafkaScan" => ReadFileFormat.KafkaReadFormat
+ case _ =>
+ throw new GlutenNotSupportException("Only support KafkaScan.")
+ }
+
+}
diff --git
a/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala
b/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala
new file mode 100644
index 0000000000..0855e03192
--- /dev/null
+++
b/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.kafka
+
+import org.apache.gluten.execution.{MicroBatchScanExecTransformer,
WholeStageTransformerSuite}
+import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
+import org.apache.spark.sql.functions.{col, split}
+import org.apache.spark.sql.streaming.Trigger
+
+import scala.concurrent.duration.DurationInt
+
+trait GlutenKafkaScanSuite extends WholeStageTransformerSuite {
+ protected val kafkaBootstrapServers: String
+
+ test("test MicroBatchScanExecTransformer not fallback") {
+ withTempDir(
+ dir => {
+ val table_name = "kafka_table"
+ withTable(s"$table_name") {
+ val df = spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", kafkaBootstrapServers)
+ .option("subscribe", table_name)
+ .load()
+ .withColumn("sp", split(col("value").cast("string"), ","))
+ .withColumn("id", col("sp").getItem(0).cast("int"))
+ .withColumn("name", col("sp").getItem(1).cast("string"))
+ .drop(col("sp"))
+ .drop(col("value"))
+ .drop(col("key"))
+ .drop(col("topic"))
+ .drop(col("partition"))
+ .drop(col("offset"))
+ .drop(col("timestamp"))
+ .drop(col("timestampType"))
+
+ val streamQuery = df.writeStream
+ .format("parquet")
+ .option("checkpointLocation", dir.getCanonicalPath +
"/_checkpoint")
+ .trigger(Trigger.ProcessingTime("5 seconds"))
+ .start(dir.getCanonicalPath)
+
+ spark.sql(s"""
+ |CREATE EXTERNAL TABLE $table_name (
+ | id long,
+ | name string
+ |)USING Parquet
+ |LOCATION '${dir.getCanonicalPath}'
+ |""".stripMargin)
+
+ spark
+ .range(0, 20)
+ .selectExpr(
+ "concat(id,',', id) as value"
+ )
+ .write
+ .format("kafka")
+ .option("kafka.bootstrap.servers", kafkaBootstrapServers)
+ .option("topic", table_name)
+ .save()
+
+ eventually(timeout(60.seconds), interval(5.seconds)) {
+ val size = streamQuery
+ .asInstanceOf[StreamingQueryWrapper]
+ .streamingQuery
+ .lastExecution
+ .executedPlan
+ .collect { case p: MicroBatchScanExecTransformer => p }
+ assert(size.size == 1)
+ streamQuery.awaitTermination(1000)
+ }
+ }
+ })
+ }
+}
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index 9638e43cd7..bebc6b1e95 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -49,6 +49,7 @@ public class LocalFilesNode implements SplitInfo {
MergeTreeReadFormat(),
TextReadFormat(),
JsonReadFormat(),
+ KafkaReadFormat(),
UnknownFormat()
}
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java
index fba59af57b..b82e05fd36 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java
@@ -37,6 +37,7 @@ public class ReadRelNode implements RelNode, Serializable {
private final List<ColumnTypeNode> columnTypeNodes = new ArrayList<>();
private final ExpressionNode filterNode;
private final AdvancedExtensionNode extensionNode;
+ private boolean streamKafka = false;
ReadRelNode(
List<TypeNode> types,
@@ -51,6 +52,10 @@ public class ReadRelNode implements RelNode, Serializable {
this.extensionNode = extensionNode;
}
+ public void setStreamKafka(boolean streamKafka) {
+ this.streamKafka = streamKafka;
+ }
+
@Override
public Rel toProtobuf() {
RelCommon.Builder relCommonBuilder = RelCommon.newBuilder();
@@ -62,6 +67,7 @@ public class ReadRelNode implements RelNode, Serializable {
ReadRel.Builder readBuilder = ReadRel.newBuilder();
readBuilder.setCommon(relCommonBuilder.build());
readBuilder.setBaseSchema(nStructBuilder.build());
+ readBuilder.setStreamKafka(streamKafka);
if (filterNode != null) {
readBuilder.setFilter(filterNode.toProtobuf());
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceNode.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceNode.java
new file mode 100644
index 0000000000..5974e3bb0f
--- /dev/null
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceNode.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.substrait.rel;
+
+import io.substrait.proto.ReadRel;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class StreamKafkaSourceNode implements SplitInfo {
+
+ private final String topic;
+ private final Integer partition;
+ private final Long startOffset;
+ private final Long endOffset;
+ private final Long pollTimeoutMs;
+ private final Boolean failOnDataLoss;
+ private final Boolean includeHeaders;
+
+ private final Map<String, Object> kafkaParams;
+
+ public StreamKafkaSourceNode(
+ String topic,
+ Integer partition,
+ Long startOffset,
+ Long endOffset,
+ Long pollTimeoutMs,
+ Boolean failOnDataLoss,
+ Boolean includeHeaders,
+ Map<String, Object> kafkaParams) {
+ this.topic = topic;
+ this.partition = partition;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ this.pollTimeoutMs = pollTimeoutMs;
+ this.failOnDataLoss = failOnDataLoss;
+ this.includeHeaders = includeHeaders;
+ this.kafkaParams = kafkaParams;
+ }
+
+ @Override
+ public List<String> preferredLocations() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public ReadRel.StreamKafka toProtobuf() {
+ ReadRel.StreamKafka.Builder builder = ReadRel.StreamKafka.newBuilder();
+
+ ReadRel.StreamKafka.TopicPartition.Builder topicPartition =
+ ReadRel.StreamKafka.TopicPartition.newBuilder();
+ topicPartition.setTopic(topic);
+ topicPartition.setPartition(partition);
+
+ builder.setTopicPartition(topicPartition.build());
+ builder.setStartOffset(startOffset);
+ builder.setEndOffset(endOffset);
+ builder.setPollTimeoutMs(pollTimeoutMs);
+ builder.setFailOnDataLoss(failOnDataLoss);
+ builder.setIncludeHeaders(includeHeaders);
+ kafkaParams.forEach((k, v) -> builder.putParams(k, v.toString()));
+
+ return builder.build();
+ }
+}
diff --git
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
index ca669c7639..d53a9aef9d 100644
---
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
+++
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -69,6 +69,7 @@ message ReadRel {
LocalFiles local_files = 6;
NamedTable named_table = 7;
ExtensionTable extension_table = 8;
+ bool stream_kafka = 9;
}
// A base table. The list of string is used to represent namespacing (e.g.,
mydb.mytable).
@@ -89,6 +90,22 @@ message ReadRel {
google.protobuf.Any detail = 1;
}
+ // Used to KafkaBatch or KafkaContinuous source
+ message StreamKafka {
+ message TopicPartition {
+ string topic = 1;
+ int32 partition = 2;
+ }
+
+ TopicPartition topic_partition = 1;
+ int64 start_offset = 2;
+ int64 end_offset = 3;
+ map<string, string> params = 4;
+ int64 poll_timeout_ms = 5;
+ bool fail_on_data_loss = 6;
+ bool include_headers = 7;
+ }
+
// Represents a list of files in input of a scan operation
message LocalFiles {
repeated FileOrFiles items = 1;
diff --git a/pom.xml b/pom.xml
index 21c5ac3669..d103d25dc5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -804,6 +804,79 @@
<backend_type>velox</backend_type>
</properties>
</profile>
+ <profile>
+ <id>kafka</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <modules>
+ <module>gluten-kafka</module>
+ </modules>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-kafka-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src-kafka/main/scala</source>
+ <source>${project.basedir}/src-kafka/main/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-kafka-resources</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>add-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+
<directory>${project.basedir}/src-kafka/main/resources</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-kafka-test-sources</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src-kafka/test/scala</source>
+ <source>${project.basedir}/src-kafka/test/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-kafka-test-resources</id>
+ <phase>generate-test-resources</phase>
+ <goals>
+ <goal>add-test-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+
<directory>${project.basedir}/src-kafka/test/resources</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
<profile>
<id>spark-ut</id>
<modules>
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
index 6b495105d9..001cd9a582 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -46,7 +46,10 @@ abstract class AbstractBatchScanExec(
override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
- @transient override lazy val partitions: Seq[InputPartition] =
batch.planInputPartitions()
+ @transient override lazy val partitions: Seq[InputPartition] =
inputPartitionsShim
+
+ @transient protected lazy val inputPartitionsShim: Seq[InputPartition] =
+ batch.planInputPartitions()
@transient private lazy val filteredPartitions: Seq[InputPartition] = {
val dataSourceFilters = runtimeFilters.flatMap {
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
index f0ee5c1ab7..b12a257a0c 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -47,7 +47,10 @@ abstract class AbstractBatchScanExec(
override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
- @transient override lazy val inputPartitions: Seq[InputPartition] =
batch.planInputPartitions()
+ @transient override lazy val inputPartitions: Seq[InputPartition] =
inputPartitionsShim
+
+ @transient protected lazy val inputPartitionsShim: Seq[InputPartition] =
+ batch.planInputPartitions()
@transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
val dataSourceFilters = runtimeFilters.flatMap {
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
index 3313c3c768..aeaf5ccf7a 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -57,7 +57,10 @@ abstract class AbstractBatchScanExec(
override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
- @transient override lazy val inputPartitions: Seq[InputPartition] =
batch.planInputPartitions()
+ @transient override lazy val inputPartitions: Seq[InputPartition] =
inputPartitionsShim
+
+ @transient protected lazy val inputPartitionsShim: Seq[InputPartition] =
+ batch.planInputPartitions()
@transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
val dataSourceFilters = runtimeFilters.flatMap {
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
index 8f51ea2c72..3508711124 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -52,7 +52,10 @@ abstract class AbstractBatchScanExec(
override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
- @transient override lazy val inputPartitions: Seq[InputPartition] =
batch.planInputPartitions()
+ @transient override lazy val inputPartitions: Seq[InputPartition] =
inputPartitionsShim
+
+ @transient protected lazy val inputPartitionsShim: Seq[InputPartition] =
+ batch.planInputPartitions()
@transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
val dataSourceFilters = runtimeFilters.flatMap {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]