This is an automated email from the ASF dual-hosted git repository.

loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b29aa3b997 [CORE][CH] Support MicroBatchScanExec with KafkaScan in 
batch mode (#8321)
b29aa3b997 is described below

commit b29aa3b9973d28ed03e9840772734dd1f47b3cdd
Author: Shuai li <[email protected]>
AuthorDate: Mon Jan 20 09:53:38 2025 +0800

    [CORE][CH] Support MicroBatchScanExec with KafkaScan in batch mode (#8321)
    
    * [CH] Support MicroBatchScanExec with KafkaScan in batch mode
---
 backends-clickhouse/pom.xml                        |  26 ++
 .../org.apache.gluten.component.CHKafkaComponent   |   0
 .../CHKafkaComponent.scala                         |  32 ++
 .../kafka/ClickhouseGlutenKafkaScanSuite.scala     |  28 ++
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |   2 +
 .../backendsapi/clickhouse/CHIteratorApi.scala     |   2 +
 .../gluten/metrics/BatchScanMetricsUpdater.scala   |   5 +-
 cpp-ch/CMakeLists.txt                              |   2 +-
 cpp-ch/local-engine/CMakeLists.txt                 |   2 +
 cpp-ch/local-engine/Common/CHUtil.cpp              |   3 +
 .../Parser/RelParsers/ReadRelParser.cpp            |  27 +-
 .../local-engine/Parser/RelParsers/ReadRelParser.h |   4 +-
 .../Parser/RelParsers/StreamKafkaRelParser.cpp     |  96 ++++++
 .../Parser/RelParsers/StreamKafkaRelParser.h       |  58 ++++
 .../Storages/Kafka/GlutenKafkaSource.cpp           | 325 +++++++++++++++++++++
 .../Storages/Kafka/GlutenKafkaSource.h             | 110 +++++++
 .../Storages/Kafka/GlutenKafkaUtils.cpp            | 301 +++++++++++++++++++
 .../local-engine/Storages/Kafka/GlutenKafkaUtils.h |  74 +++++
 .../Storages/Kafka/ReadFromGlutenStorageKafka.cpp  | 100 +++++++
 .../Storages/Kafka/ReadFromGlutenStorageKafka.h    |  65 +++++
 docs/developers/SubstraitModifications.md          |   1 +
 gluten-kafka/pom.xml                               | 149 ++++++++++
 .../substrait/rel/StreamKafkaSourceBuilder.java    |  41 +++
 .../execution/MicroBatchScanExecTransformer.scala  | 116 ++++++++
 .../apache/gluten/execution/OffloadKafkaScan.scala |  56 ++++
 .../sql/kafka010/GlutenStreamKafkaSourceUtil.scala |  48 +++
 .../execution/kafka/GlutenKafkaScanSuite.scala     |  89 ++++++
 .../gluten/substrait/rel/LocalFilesNode.java       |   1 +
 .../apache/gluten/substrait/rel/ReadRelNode.java   |   6 +
 .../substrait/rel/StreamKafkaSourceNode.java       |  80 +++++
 .../substrait/proto/substrait/algebra.proto        |  17 ++
 pom.xml                                            |  73 +++++
 .../datasources/v2/AbstractBatchScanExec.scala     |   5 +-
 .../datasources/v2/AbstractBatchScanExec.scala     |   5 +-
 .../datasources/v2/AbstractBatchScanExec.scala     |   5 +-
 .../datasources/v2/AbstractBatchScanExec.scala     |   5 +-
 36 files changed, 1948 insertions(+), 11 deletions(-)

diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index fa6863b877..6ca945d8cc 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -68,6 +68,32 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>kafka</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.gluten</groupId>
+          <artifactId>gluten-kafka</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.gluten</groupId>
+          <artifactId>gluten-kafka</artifactId>
+          <version>${project.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
+          <version>${spark.version}</version>
+          <scope>provided</scope>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 
   <dependencies>
diff --git 
a/backends-clickhouse/src-kafka/main/resources/META-INF/gluten-components/org.apache.gluten.component.CHKafkaComponent
 
b/backends-clickhouse/src-kafka/main/resources/META-INF/gluten-components/org.apache.gluten.component.CHKafkaComponent
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/backends-clickhouse/src-kafka/main/scala/org.apache.gluten.component/CHKafkaComponent.scala
 
b/backends-clickhouse/src-kafka/main/scala/org.apache.gluten.component/CHKafkaComponent.scala
new file mode 100644
index 0000000000..1e9f0f9d77
--- /dev/null
+++ 
b/backends-clickhouse/src-kafka/main/scala/org.apache.gluten.component/CHKafkaComponent.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.component
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackend
+import org.apache.gluten.execution.OffloadKafkaScan
+import org.apache.gluten.extension.injector.Injector
+
+class CHKafkaComponent extends Component {
+  override def name(): String = "clickhouse-kafka"
+  override def buildInfo(): Component.BuildInfo =
+    Component.BuildInfo("ClickHouseKafka", "N/A", "N/A", "N/A")
+  override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend] 
:: Nil
+  override def injectRules(injector: Injector): Unit = {
+    OffloadKafkaScan.inject(injector)
+  }
+}
diff --git 
a/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala
 
b/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala
new file mode 100644
index 0000000000..cfb53a30ac
--- /dev/null
+++ 
b/backends-clickhouse/src-kafka/test/scala/org/apache/gluten/execution/kafka/ClickhouseGlutenKafkaScanSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.kafka
+
+import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite
+
+class ClickhouseGlutenKafkaScanSuite
+  extends GlutenClickHouseWholeStageTransformerSuite
+  with GlutenKafkaScanSuite {
+
+  override protected val fileFormat: String = "parquet"
+
+  protected val kafkaBootstrapServers: String = "localhost:9092"
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 9626987593..a47aab55fe 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -200,6 +200,7 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
           ValidationResult.failed("Has complex type.")
         }
       case JsonReadFormat => ValidationResult.succeeded
+      case KafkaReadFormat => ValidationResult.succeeded
       case _ => ValidationResult.failed(s"Unsupported file format $format")
     }
   }
@@ -221,6 +222,7 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
       case "OrcScan" => ReadFileFormat.OrcReadFormat
       case "ParquetScan" => ReadFileFormat.ParquetReadFormat
       case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
+      case "KafkaScan" => ReadFileFormat.KafkaReadFormat
       case _ => ReadFileFormat.UnknownFormat
     }
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 0b20e5aeea..b041b1b817 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -243,6 +243,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
                 (filesNode.toProtobuf.toByteArray, 
filesNode.getPaths.asScala.toSeq)
               case extensionTableNode: ExtensionTableNode =>
                 (extensionTableNode.toProtobuf.toByteArray, 
extensionTableNode.getPartList)
+              case kafkaSourceNode: StreamKafkaSourceNode =>
+                (kafkaSourceNode.toProtobuf.toByteArray, Seq.empty)
             }
         }.unzip
 
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
index 1b9389351b..9002b33e38 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/BatchScanMetricsUpdater.scala
@@ -64,6 +64,7 @@ class BatchScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric])
 
 object BatchScanMetricsUpdater {
   // in mergetree format, the processor name is `MergeTreeSelect(pool: XXX, 
algorithm: XXX)`
-  val INCLUDING_PROCESSORS = Array("MergeTreeSelect(pool", 
"SubstraitFileSource")
-  val CH_PLAN_NODE_NAME = Array("MergeTreeSelect(pool", "SubstraitFileSource")
+  val INCLUDING_PROCESSORS =
+    Array("MergeTreeSelect(pool", "SubstraitFileSource", "GlutenKafkaSource")
+  val CH_PLAN_NODE_NAME = Array("MergeTreeSelect(pool", "SubstraitFileSource", 
"GlutenKafkaSource")
 }
diff --git a/cpp-ch/CMakeLists.txt b/cpp-ch/CMakeLists.txt
index 15be68a2a6..1b23fa38d7 100644
--- a/cpp-ch/CMakeLists.txt
+++ b/cpp-ch/CMakeLists.txt
@@ -106,7 +106,7 @@ else()
       -DENABLE_TESTS=OFF -DENABLE_JEMALLOC=ON -DENABLE_MULTITARGET_CODE=ON
       -DENABLE_EXTERN_LOCAL_ENGINE=ON -DENABLE_ODBC=OFF -DENABLE_CAPNP=OFF
       -DENABLE_GRPC=OFF -DENABLE_RUST=OFF -DENABLE_H3=OFF -DENABLE_AMQPCPP=OFF
-      -DENABLE_CASSANDRA=OFF -DENABLE_KAFKA=OFF -DENABLE_NATS=OFF
+      -DENABLE_CASSANDRA=OFF -DENABLE_KAFKA=ON -DENABLE_NATS=OFF
       -DENABLE_LIBPQXX=OFF -DENABLE_NURAFT=OFF -DENABLE_DATASKETCHES=OFF
       -DENABLE_SQLITE=OFF -DENABLE_S2_GEOMETRY=OFF -DENABLE_ANNOY=OFF
       -DENABLE_ULID=OFF -DENABLE_MYSQL=OFF -DENABLE_BCRYPT=OFF 
-DENABLE_LDAP=OFF
diff --git a/cpp-ch/local-engine/CMakeLists.txt 
b/cpp-ch/local-engine/CMakeLists.txt
index f1819ff217..4be2bdbc38 100644
--- a/cpp-ch/local-engine/CMakeLists.txt
+++ b/cpp-ch/local-engine/CMakeLists.txt
@@ -53,10 +53,12 @@ add_headers_and_sources(parser Parser)
 add_headers_and_sources(parser Parser/RelParsers)
 add_headers_and_sources(rewriter Rewriter)
 add_headers_and_sources(storages Storages)
+add_headers_and_sources(storages Storages/Kafka)
 add_headers_and_sources(storages Storages/Output)
 add_headers_and_sources(storages Storages/Serializations)
 add_headers_and_sources(storages Storages/IO)
 add_headers_and_sources(storages Storages/MergeTree)
+add_headers_and_sources(storages Storages/Kafka)
 add_headers_and_sources(storages Storages/Cache)
 add_headers_and_sources(common Common)
 add_headers_and_sources(external External)
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 37d1a1bbd7..5d7a5c803b 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -43,6 +43,7 @@
 #include <DataTypes/NestedUtils.h>
 #include <Disks/registerDisks.h>
 #include <Disks/registerGlutenDisks.h>
+#include <Formats/registerFormats.h>
 #include <Functions/FunctionFactory.h>
 #include <Functions/FunctionHelpers.h>
 #include <Functions/registerFunctions.h>
@@ -880,6 +881,8 @@ void registerGlutenDisks()
 
 void BackendInitializerUtil::registerAllFactories()
 {
+    registerFormats();
+
     registerGlutenDisks();
 
     registerReadBufferBuilders();
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
index 2e4dd4af1b..f65584eaea 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
@@ -22,6 +22,7 @@
 #include <Interpreters/Context.h>
 #include <Operator/BlocksBufferPoolTransform.h>
 #include <Parser/RelParsers/MergeTreeRelParser.h>
+#include <Parser/RelParsers/StreamKafkaRelParser.h>
 #include <Parser/SubstraitParserUtils.h>
 #include <Parser/TypeParser.h>
 #include <Processors/QueryPlan/ReadFromPreparedSource.h>
@@ -46,12 +47,12 @@ extern const int LOGICAL_ERROR;
 namespace local_engine
 {
 using namespace DB;
-DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const 
substrait::Rel & rel, std::list<const substrait::Rel *> &)
+DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const 
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack)
 {
     if (query_plan)
         throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's 
input plan should be null");
     const auto & read = rel.read();
-    if (read.has_local_files() || (!read.has_extension_table() && 
!isReadFromMergeTree(read)))
+    if (isReadFromDefault(read))
     {
         assert(read.has_base_schema());
         DB::QueryPlanStepPtr read_step;
@@ -70,6 +71,13 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr 
query_plan, const substra
             query_plan->addStep(std::move(buffer_step));
         }
     }
+    else if (isReadFromStreamKafka(read))
+    {
+        StreamKafkaRelParser kafka_parser(parser_context, getContext());
+        kafka_parser.setSplitInfo(split_info);
+        query_plan = kafka_parser.parse(std::make_unique<DB::QueryPlan>(), 
rel, rel_stack);
+        steps = kafka_parser.getSteps();
+    }
     else
     {
         substrait::ReadRel::ExtensionTable extension_table;
@@ -87,6 +95,11 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr 
query_plan, const substra
     return query_plan;
 }
 
+bool ReadRelParser::isReadFromDefault(const substrait::ReadRel & read)
+{
+    return read.has_local_files() || (!read.has_extension_table() && 
!isReadFromMergeTree(read) && !isReadFromStreamKafka(read));
+}
+
 bool ReadRelParser::isReadRelFromJava(const substrait::ReadRel & rel)
 {
     return rel.has_local_files() && rel.local_files().items().size() == 1
@@ -95,7 +108,9 @@ bool ReadRelParser::isReadRelFromJava(const 
substrait::ReadRel & rel)
 
 bool ReadRelParser::isReadFromMergeTree(const substrait::ReadRel & rel)
 {
-    assert(rel.has_advanced_extension());
+    if (!rel.has_advanced_extension())
+        return false;
+
     bool is_read_from_merge_tree;
     google::protobuf::StringValue optimization;
     
optimization.ParseFromString(rel.advanced_extension().optimization().value());
@@ -107,6 +122,12 @@ bool ReadRelParser::isReadFromMergeTree(const 
substrait::ReadRel & rel)
     return is_read_from_merge_tree;
 }
 
+
+bool ReadRelParser::isReadFromStreamKafka(const substrait::ReadRel & rel)
+{
+    return rel.has_stream_kafka() && rel.stream_kafka();
+}
+
 DB::QueryPlanStepPtr ReadRelParser::parseReadRelWithJavaIter(const 
substrait::ReadRel & rel)
 {
     GET_JNIENV(env)
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h
index 7f84a89ed4..a6ad920947 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h
@@ -34,12 +34,14 @@ public:
         return parse(std::move(query_plan), rel, rel_stack_);
     }
 
-    DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & 
rel, std::list<const substrait::Rel *> &) override;
+    DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel & 
rel, std::list<const substrait::Rel *> & rel_stack) override;
     // This is source node, there is no input
     std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return {}; }
 
+    bool isReadFromDefault(const substrait::ReadRel & rel);
     bool isReadRelFromJava(const substrait::ReadRel & rel);
     bool isReadFromMergeTree(const substrait::ReadRel & rel);
+    bool isReadFromStreamKafka(const substrait::ReadRel & rel);
 
     void setInputIter(jobject input_iter_, bool is_materialze)
     {
diff --git a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp
new file mode 100644
index 0000000000..0771b481d7
--- /dev/null
+++ b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.cpp
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StreamKafkaRelParser.h"
+
+#include <Parser/SubstraitParserUtils.h>
+#include <Parser/TypeParser.h>
+#include <Storages/Kafka/ReadFromGlutenStorageKafka.h>
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+extern const int NO_SUCH_DATA_PART;
+extern const int LOGICAL_ERROR;
+extern const int UNKNOWN_FUNCTION;
+extern const int UNKNOWN_TYPE;
+}
+}
+
+
+namespace local_engine
+{
+
+DB::QueryPlanPtr
+StreamKafkaRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel 
& rel, std::list<const substrait::Rel *> & rel_stack_)
+{
+    if (rel.has_read())
+        return parseRelImpl(std::move(query_plan), rel.read());
+
+    throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "StreamKafkaRelParser 
can't parse rel:{}", rel.ShortDebugString());
+}
+
+DB::QueryPlanPtr StreamKafkaRelParser::parseRelImpl(DB::QueryPlanPtr 
query_plan, const substrait::ReadRel & read_rel)
+{
+    if (!read_rel.has_stream_kafka())
+        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can't not parse 
kafka rel, because of read rel don't contained stream kafka");
+
+    auto kafka_task = 
BinaryToMessage<substrait::ReadRel::StreamKafka>(split_info);
+    auto topic = kafka_task.topic_partition().topic();
+    auto partition = kafka_task.topic_partition().partition();
+    auto start_offset = kafka_task.start_offset();
+    auto end_offset = kafka_task.end_offset();
+    auto poll_timeout_ms = kafka_task.poll_timeout_ms();
+    String group_id;
+    String brokers;
+
+    for (auto param : kafka_task.params())
+        if (param.first == "poll_timeout_ms")
+            poll_timeout_ms = std::stoi(param.second);
+        else if (param.first == "group.id")
+            group_id = param.second;
+        else if (param.first == "bootstrap.servers")
+            brokers = param.second;
+        else
+            LOG_DEBUG(getLogger("StreamKafkaRelParser"), "Unused kafka 
parameter: {}: {}", param.first, param.second);
+
+    LOG_INFO(
+        getLogger("StreamKafkaRelParser"),
+        "Kafka source: topic: {}, partition: {}, start_offset: {}, end_offset: 
{}",
+        topic,
+        partition,
+        start_offset,
+        end_offset);
+
+    Names topics;
+    topics.emplace_back(topic);
+
+    auto header = 
TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
+    Names names = header.getNames();
+    auto source = std::make_unique<ReadFromGlutenStorageKafka>(
+        names, header, getContext(), topics, partition, start_offset, 
end_offset, poll_timeout_ms, group_id, brokers);
+
+    steps.emplace_back(source.get());
+    query_plan->addStep(std::move(source));
+
+    return query_plan;
+}
+
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h
new file mode 100644
index 0000000000..1fd12dea00
--- /dev/null
+++ b/cpp-ch/local-engine/Parser/RelParsers/StreamKafkaRelParser.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <Parser/RelParsers/RelParser.h>
+
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int LOGICAL_ERROR;
+}
+}
+
+namespace local_engine
+{
+class StreamKafkaRelParser : public RelParser
+{
+public:
+    explicit StreamKafkaRelParser(ParserContextPtr parser_context_, const 
DB::ContextPtr & context_)
+        : RelParser(parser_context_), context(context_)
+    {
+    }
+
+    ~StreamKafkaRelParser() override = default;
+
+    DB::QueryPlanPtr
+    parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
+
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
&) override
+    {
+        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, 
"StreamKafkaRelParser can't call getSingleInput().");
+    }
+
+    void setSplitInfo(String split_info_) { split_info = split_info_; }
+
+private:
+    DB::QueryPlanPtr parseRelImpl(DB::QueryPlanPtr query_plan, const 
substrait::ReadRel & read_rel);
+
+    DB::ContextPtr context;
+
+    String split_info;
+};
+}
diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp 
b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp
new file mode 100644
index 0000000000..471899d933
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.cpp
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GlutenKafkaSource.h"
+
+#include <Core/Settings.h>
+#include <DataTypes/DataTypeNullable.h>
+#include <Formats/FormatFactory.h>
+#include <Processors/Executors/StreamingFormatExecutor.h>
+#include <Storages/Kafka/GlutenKafkaUtils.h>
+#include <Storages/Kafka/KafkaConfigLoader.h>
+#include <Storages/Kafka/KafkaSettings.h>
+#include <Storages/Kafka/parseSyslogLevel.h>
+#include <boost/algorithm/string/replace.hpp>
+#include <Common/NamedCollections/NamedCollectionsFactory.h>
+
+
+namespace ProfileEvents
+{
+extern const Event KafkaMessagesRead;
+extern const Event KafkaMessagesFailed;
+extern const Event KafkaRowsRead;
+extern const Event KafkaRowsRejected;
+}
+
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int LOGICAL_ERROR;
+}
+
+namespace Setting
+{
+extern const SettingsUInt64 max_block_size;
+extern const SettingsUInt64 max_insert_block_size;
+extern const SettingsUInt64 output_format_avro_rows_in_file;
+extern const SettingsMilliseconds stream_flush_interval_ms;
+extern const SettingsMilliseconds stream_poll_timeout_ms;
+extern const SettingsBool use_concurrency_control;
+}
+
+namespace KafkaSetting
+{
+extern const KafkaSettingsUInt64 input_format_allow_errors_num;
+extern const KafkaSettingsFloat input_format_allow_errors_ratio;
+extern const KafkaSettingsString kafka_broker_list;
+extern const KafkaSettingsString kafka_client_id;
+extern const KafkaSettingsBool kafka_commit_every_batch;
+extern const KafkaSettingsBool kafka_commit_on_select;
+extern const KafkaSettingsUInt64 kafka_consumers_pool_ttl_ms;
+extern const KafkaSettingsMilliseconds kafka_flush_interval_ms;
+extern const KafkaSettingsString kafka_format;
+extern const KafkaSettingsString kafka_group_name;
+extern const KafkaSettingsStreamingHandleErrorMode kafka_handle_error_mode;
+extern const KafkaSettingsUInt64 kafka_max_block_size;
+extern const KafkaSettingsUInt64 kafka_max_rows_per_message;
+extern const KafkaSettingsUInt64 kafka_num_consumers;
+extern const KafkaSettingsUInt64 kafka_poll_max_batch_size;
+extern const KafkaSettingsMilliseconds kafka_poll_timeout_ms;
+extern const KafkaSettingsString kafka_schema;
+extern const KafkaSettingsBool kafka_thread_per_consumer;
+extern const KafkaSettingsString kafka_topic_list;
+}
+}
+
+
+namespace local_engine
+{
+
+size_t GlutenKafkaSource::getPollMaxBatchSize() const
+{
+    size_t batch_size = 
(*kafka_settings)[KafkaSetting::kafka_poll_max_batch_size].changed
+        ? (*kafka_settings)[KafkaSetting::kafka_poll_max_batch_size].value
+        : context->getSettingsRef()[Setting::max_block_size].value;
+
+    return std::min(batch_size, getMaxBlockSize());
+}
+
+size_t GlutenKafkaSource::getMaxBlockSize() const
+{
+    return (*kafka_settings)[KafkaSetting::kafka_max_block_size].changed
+        ? (*kafka_settings)[KafkaSetting::kafka_max_block_size].value
+        : (context->getSettingsRef()[Setting::max_insert_block_size].value / 
/*num_consumers*/ 1);
+}
+
+size_t GlutenKafkaSource::getPollTimeoutMillisecond() const
+{
+    return (*kafka_settings)[KafkaSetting::kafka_poll_timeout_ms].changed
+        ? 
(*kafka_settings)[KafkaSetting::kafka_poll_timeout_ms].totalMilliseconds()
+        : 
context->getSettingsRef()[Setting::stream_poll_timeout_ms].totalMilliseconds();
+}
+
+GlutenKafkaSource::GlutenKafkaSource(
+    const Block & result_header_,
+    const ContextPtr & context_,
+    const Names & topics_,
+    const size_t & partition_,
+    const String & brokers_,
+    const String & group_,
+    const size_t & poll_timeout_ms_,
+    const size_t & start_offset_,
+    const size_t & end_offset_,
+    const std::shared_ptr<KafkaSettings> & kafka_settings_)
+    : ISource(result_header_)
+    , context(context_)
+    , log(getLogger("GlutenKafkaSource"))
+    , kafka_settings(kafka_settings_)
+    , result_header(result_header_)
+    , topics(topics_)
+    , brokers(brokers_)
+    , group(group_)
+    , poll_timeout_ms(poll_timeout_ms_)
+    , start_offset(start_offset_)
+    , end_offset(end_offset_)
+    , partition(partition_)
+{
+    max_block_size = end_offset - start_offset;
+    client_id = topics[0] + "_" + std::to_string(partition);
+
+    for (const auto & columns_with_type_and_name : 
result_header.getColumnsWithTypeAndName())
+    {
+        if (columns_with_type_and_name.name == "value")
+        {
+            const auto no_null_datatype = 
removeNullable(columns_with_type_and_name.type);
+            
non_virtual_header.insert(ColumnWithTypeAndName(no_null_datatype->createColumn(),
 no_null_datatype, "value"));
+            continue;
+        }
+
+        virtual_header.insert(columns_with_type_and_name);
+    }
+}
+
+GlutenKafkaSource::~GlutenKafkaSource()
+{
+    std::lock_guard lock(consumer_mutex);
+    auto topic_partition = TopicPartition{topics[0], partition};
+    consumers_in_memory[topic_partition].emplace_back(consumer);
+    LOG_DEBUG(
+        log,
+        "Kafka consumer for topic: {}, partition: {} is returned to pool, 
current pool size: {}",
+        topics[0],
+        partition,
+        consumers_in_memory[topic_partition].size());
+}
+
+void GlutenKafkaSource::initConsumer()
+{
+    std::lock_guard lock(consumer_mutex);
+    auto topic_partition = TopicPartition{topics[0], partition};
+    consumers_in_memory.try_emplace(topic_partition, 
std::vector<std::shared_ptr<DB::KafkaConsumer>>());
+
+    auto & consumers = consumers_in_memory[topic_partition];
+
+    if (!consumers.empty())
+    {
+        LOG_DEBUG(log, "Reuse Kafka consumer for topic: {}, partition: {}", 
topics[0], partition);
+        consumer = consumers.back();
+        consumers.pop_back();
+    }
+
+    if (!consumer)
+    {
+        LOG_DEBUG(log, "Creating new Kafka consumer for topic: {}, partition: 
{}", topics[0], partition);
+        String collection_name = "";
+        std::shared_ptr<DB::KafkaConsumer> kafka_consumer_ptr = 
std::make_shared<KafkaConsumer>(
+            log,
+            getPollMaxBatchSize(),
+            getPollTimeoutMillisecond(),
+            /*intermediate_commit*/ false,
+            /*stream_cancelled*/ is_stopped,
+            topics);
+
+        KafkaConfigLoader::ConsumerConfigParams params{
+            {context->getConfigRef(), /*collection_name*/ collection_name, 
topics, log},
+            brokers,
+            group,
+            false,
+            1,
+            client_id,
+            getMaxBlockSize()};
+
+        
kafka_consumer_ptr->createConsumer(GlutenKafkaUtils::getConsumerConfiguration(params));
+        consumer = kafka_consumer_ptr;
+        LOG_DEBUG(log, "Created new Kafka consumer for topic: {}, partition: 
{}", topics[0], partition);
+    }
+
+    consumer->subscribe(topics[0], partition, start_offset);
+}
+
+
+Chunk GlutenKafkaSource::generateImpl()
+{
+    if (!consumer)
+        initConsumer();
+
+    size_t total_rows = 0;
+    MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
+    MutableColumns no_virtual_columns = non_virtual_header.cloneEmptyColumns();
+
+    while (true)
+    {
+        if (auto buf = consumer->consume())
+        {
+            String message;
+            readStringUntilEOF(message, *buf);
+            no_virtual_columns[0]->insert(message);
+
+            // In read_kafka_message(), KafkaConsumer::nextImpl()
+            // will be called, that may make something unusable, i.e. clean
+            // KafkaConsumer::messages, which is accessed from
+            // KafkaConsumer::currentTopic() (and other helpers).
+            if (consumer->isStalled())
+                throw Exception(ErrorCodes::LOGICAL_ERROR, "Polled messages 
became unusable");
+
+            ProfileEvents::increment(ProfileEvents::KafkaRowsRead, 1);
+            consumer->storeLastReadMessageOffset();
+
+            auto topic = consumer->currentTopic();
+            auto key = consumer->currentKey();
+            auto offset = consumer->currentOffset();
+            auto partition = consumer->currentPartition();
+            auto timestamp_raw = consumer->currentTimestamp();
+            auto header_list = consumer->currentHeaderList();
+
+            virtual_columns[0]->insert(key);
+            virtual_columns[1]->insert(topic);
+            virtual_columns[2]->insert(partition);
+            virtual_columns[3]->insert(offset);
+
+            if (timestamp_raw)
+            {
+                auto ts = timestamp_raw->get_timestamp();
+                virtual_columns[4]->insert(
+                    
DecimalField<Decimal64>(std::chrono::duration_cast<std::chrono::milliseconds>(ts).count(),
 3));
+            }
+            else
+            {
+                virtual_columns[4]->insertDefault();
+            }
+
+            virtual_columns[5]->insertDefault();
+
+            total_rows = total_rows + 1;
+        }
+        else if (consumer->polledDataUnusable())
+        {
+            break;
+        }
+        else
+        {
+            // We came here in case of tombstone (or sometimes zero-length) 
messages, and it is not something abnormal
+            // TODO: it seems like in case of put_error_to_stream=true we may 
need to process those differently
+            // currently we just skip them with note in logs.
+            consumer->storeLastReadMessageOffset();
+            LOG_DEBUG(
+                log,
+                "Parsing of message (topic: {}, partition: {}, offset: {}) 
return no rows.",
+                consumer->currentTopic(),
+                consumer->currentPartition(),
+                consumer->currentOffset());
+        }
+
+        if (!consumer->hasMorePolledMessages() || total_rows >= max_block_size)
+            break;
+    }
+
+    LOG_DEBUG(log, "Read {} rows from Kafka topic: {}, partition: {}", 
total_rows, topics[0], partition);
+
+    if (total_rows == 0)
+        return {};
+
+    if (consumer->polledDataUnusable())
+    {
+        // the rows were counted already before by KafkaRowsRead,
+        // so let's count the rows we ignore separately
+        // (they will be retried after the rebalance)
+        ProfileEvents::increment(ProfileEvents::KafkaRowsRejected, total_rows);
+        return {};
+    }
+
+    auto result_block = 
non_virtual_header.cloneWithColumns(std::move(no_virtual_columns)); 
//.cloneWithCutColumns(0, max_block_size);
+
+    auto virtual_block = 
virtual_header.cloneWithColumns(std::move(virtual_columns)); 
//.cloneWithCutColumns(0, max_block_size);
+    for (const auto & column : virtual_block.getColumnsWithTypeAndName())
+        result_block.insert(column);
+
+    progress(total_rows, result_block.bytes());
+
+    auto converting_dag = ActionsDAG::makeConvertingActions(
+        result_block.cloneEmpty().getColumnsWithTypeAndName(),
+        getPort().getHeader().getColumnsWithTypeAndName(),
+        ActionsDAG::MatchColumnsMode::Name);
+
+    auto converting_actions = 
std::make_shared<ExpressionActions>(std::move(converting_dag));
+    converting_actions->execute(result_block);
+
+    return Chunk(result_block.getColumns(), result_block.rows());
+}
+
+Chunk GlutenKafkaSource::generate()
+{
+    if (isCancelled() || finished)
+        return {};
+
+    auto chunk = generateImpl();
+    finished = true;
+
+    return chunk;
+}
+}
diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h 
b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h
new file mode 100644
index 0000000000..c1aa541579
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaSource.h
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+
+#include <Processors/ISource.h>
+#include <Storages/Kafka/KafkaConsumer.h>
+#include <Storages/Kafka/KafkaSettings.h>
+
+namespace local_engine
+{
+
+class GlutenKafkaSource : public DB::ISource
+{
+public:
+    GlutenKafkaSource(
+        const DB::Block & result_header_,
+        const DB::ContextPtr & context_,
+        const DB::Names & topics_,
+        const size_t & partition_,
+        const String & brokers_,
+        const String & group_,
+        const size_t & poll_timeout_ms_,
+        const size_t & start_offset_,
+        const size_t & end_offset_,
+        const std::shared_ptr<DB::KafkaSettings> & kafka_settings_);
+
+    ~GlutenKafkaSource() override;
+
+    struct TopicPartition
+    {
+        String topic;
+        size_t partition;
+
+        bool operator==(const TopicPartition & other) const { return topic == 
other.topic && partition == other.partition; }
+
+        TopicPartition(const String & topic, size_t partition) : topic(topic), 
partition(partition) { }
+    };
+
+    String getName() const override { return "GlutenKafkaSource"; }
+
+protected:
+    DB::Chunk generate() override;
+
+private:
+    DB::Chunk generateImpl();
+    void initConsumer();
+
+    size_t getPollMaxBatchSize() const;
+    size_t getMaxBlockSize() const;
+    size_t getPollTimeoutMillisecond() const;
+
+    LoggerPtr log;
+    DB::ContextPtr context;
+    UInt64 max_block_size;
+    std::shared_ptr<DB::KafkaConsumer> consumer;
+
+    DB::Block result_header;
+    DB::Block virtual_header;
+    DB::Block non_virtual_header;
+    std::shared_ptr<DB::KafkaSettings> kafka_settings;
+
+    const DB::Names topics;
+    const size_t partition;
+    const String brokers;
+    const String group;
+    const size_t poll_timeout_ms;
+    const size_t start_offset;
+    const size_t end_offset;
+    String client_id;
+    bool finished = false;
+};
+
+}
+
+namespace std
+{
+template <>
+struct hash<local_engine::GlutenKafkaSource::TopicPartition>
+{
+    std::size_t operator()(const 
local_engine::GlutenKafkaSource::TopicPartition & tp) const noexcept
+    {
+        std::size_t h1 = std::hash<std::string>{}(tp.topic);
+        std::size_t h2 = std::hash<size_t>{}(tp.partition);
+        return h1 ^ (h2 << 1); // Combine the two hash values
+    }
+};
+}
+
+namespace local_engine
+{
+static std::mutex consumer_mutex;
+static std::unordered_map<GlutenKafkaSource::TopicPartition, 
std::vector<std::shared_ptr<DB::KafkaConsumer>>> consumers_in_memory;
+static const std::atomic<bool> is_stopped{false}; // for kafka progress, it 
always false
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp 
b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp
new file mode 100644
index 0000000000..c5a6838523
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.cpp
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GlutenKafkaUtils.h"
+
+
+#include <Storages/Kafka/parseSyslogLevel.h>
+#include <boost/algorithm/string/replace.hpp>
+#include <Common/NamedCollections/NamedCollectionsFactory.h>
+#include <Common/config_version.h>
+
+namespace local_engine
+{
+using namespace DB;
+void GlutenKafkaUtils::setKafkaConfigValue(cppkafka::Configuration & 
kafka_config, const String & key, const String & value)
+{
+    /// "log_level" has valid underscore, the remaining librdkafka setting use 
dot.separated.format which isn't acceptable for XML.
+    /// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
+    const String setting_name_in_kafka_config = (key == "log_level") ? key : 
boost::replace_all_copy(key, "_", ".");
+    kafka_config.set(setting_name_in_kafka_config, value);
+}
+
+void GlutenKafkaUtils::loadNamedCollectionConfig(
+    cppkafka::Configuration & kafka_config, const String & collection_name, 
const String & config_prefix)
+{
+    const auto & collection = 
DB::NamedCollectionFactory::instance().get(collection_name);
+    for (const auto & key : collection->getKeys(-1, config_prefix))
+    {
+        // Cut prefix with '.' before actual config tag.
+        const auto param_name = key.substr(config_prefix.size() + 1);
+        setKafkaConfigValue(kafka_config, param_name, 
collection->get<String>(key));
+    }
+}
+
+void GlutenKafkaUtils::loadConfigProperty(
+    cppkafka::Configuration & kafka_config,
+    const Poco::Util::AbstractConfiguration & config,
+    const String & config_prefix,
+    const String & tag)
+{
+    const String property_path = config_prefix + "." + tag;
+    const String property_value = config.getString(property_path);
+
+    setKafkaConfigValue(kafka_config, tag, property_value);
+}
+
+
+void GlutenKafkaUtils::loadTopicConfig(
+    cppkafka::Configuration & kafka_config,
+    const Poco::Util::AbstractConfiguration & config,
+    const String & collection_name,
+    const String & config_prefix,
+    const String & topic)
+{
+    if (!collection_name.empty())
+    {
+        const auto topic_prefix = fmt::format("{}.{}", config_prefix, 
KafkaConfigLoader::CONFIG_KAFKA_TOPIC_TAG);
+        const auto & collection = 
NamedCollectionFactory::instance().get(collection_name);
+        for (const auto & key : collection->getKeys(1, config_prefix))
+        {
+            /// Only consider key <kafka_topic>. Multiple occurrences given as 
"kafka_topic", "kafka_topic[1]", etc.
+            if (!key.starts_with(topic_prefix))
+                continue;
+
+            const String kafka_topic_path = config_prefix + "." + key;
+            const String kafka_topic_name_path = kafka_topic_path + "." + 
KafkaConfigLoader::CONFIG_NAME_TAG;
+            if (topic == collection->get<String>(kafka_topic_name_path))
+                /// Found it! Now read the per-topic configuration into 
cppkafka.
+                loadNamedCollectionConfig(kafka_config, collection_name, 
kafka_topic_path);
+        }
+    }
+    else
+    {
+        /// Read all tags one level below <kafka>
+        Poco::Util::AbstractConfiguration::Keys tags;
+        config.keys(config_prefix, tags);
+
+        for (const auto & tag : tags)
+        {
+            if (tag == KafkaConfigLoader::CONFIG_NAME_TAG)
+                continue; // ignore <name>, it is used to match topic 
configurations
+            loadConfigProperty(kafka_config, config, config_prefix, tag);
+        }
+    }
+}
+
+
+void GlutenKafkaUtils::loadFromConfig(
+    cppkafka::Configuration & kafka_config, const 
KafkaConfigLoader::LoadConfigParams & params, const String & config_prefix)
+{
+    if (!params.collection_name.empty())
+    {
+        loadNamedCollectionConfig(kafka_config, params.collection_name, 
config_prefix);
+        return;
+    }
+
+    /// Read all tags one level below <kafka>
+    Poco::Util::AbstractConfiguration::Keys tags;
+    params.config.keys(config_prefix, tags);
+
+    for (const auto & tag : tags)
+    {
+        if (tag == KafkaConfigLoader::CONFIG_KAFKA_PRODUCER_TAG || tag == 
KafkaConfigLoader::CONFIG_KAFKA_CONSUMER_TAG)
+            /// Do not load consumer/producer properties, since they should be 
separated by different configuration objects.
+            continue;
+
+        if (tag.starts_with(
+                KafkaConfigLoader::CONFIG_KAFKA_TOPIC_TAG)) /// multiple 
occurrences given as "kafka_topic", "kafka_topic[1]", etc.
+        {
+            // Update consumer topic-specific configuration (new syntax). 
Example with topics "football" and "baseball":
+            //     <kafka>
+            //         <kafka_topic>
+            //             <name>football</name>
+            //             <retry_backoff_ms>250</retry_backoff_ms>
+            //             <fetch_min_bytes>5000</fetch_min_bytes>
+            //         </kafka_topic>
+            //         <kafka_topic>
+            //             <name>baseball</name>
+            //             <retry_backoff_ms>300</retry_backoff_ms>
+            //             <fetch_min_bytes>2000</fetch_min_bytes>
+            //         </kafka_topic>
+            //     </kafka>
+            // Advantages: The period restriction no longer applies (e.g. 
<name>sports.football</name> will work), everything
+            // Kafka-related is below <kafka>.
+            for (const auto & topic : params.topics)
+            {
+                /// Read topic name between <name>...</name>
+                const String kafka_topic_path = config_prefix + "." + tag;
+                const String kafka_topic_name_path = kafka_topic_path + "." + 
KafkaConfigLoader::CONFIG_NAME_TAG;
+                const String topic_name = 
params.config.getString(kafka_topic_name_path);
+
+                if (topic_name != topic)
+                    continue;
+                loadTopicConfig(kafka_config, params.config, 
params.collection_name, kafka_topic_path, topic);
+            }
+            continue;
+        }
+        if (tag.starts_with(KafkaConfigLoader::CONFIG_KAFKA_TAG))
+            /// skip legacy configuration per topic e.g. <kafka_TOPIC_NAME>.
+            /// it will be processed is a separate function
+            continue;
+        // Update configuration from the configuration. Example:
+        //     <kafka>
+        //         <retry_backoff_ms>250</retry_backoff_ms>
+        //         <fetch_min_bytes>100000</fetch_min_bytes>
+        //     </kafka>
+        loadConfigProperty(kafka_config, params.config, config_prefix, tag);
+    }
+}
+
+
+void GlutenKafkaUtils::updateGlobalConfiguration(cppkafka::Configuration & 
kafka_config, const KafkaConfigLoader::LoadConfigParams & params)
+{
+    loadFromConfig(kafka_config, params, KafkaConfigLoader::CONFIG_KAFKA_TAG);
+
+#if USE_KRB5
+    if (kafka_config.has_property("sasl.kerberos.kinit.cmd"))
+        LOG_WARNING(params.log, "sasl.kerberos.kinit.cmd configuration 
parameter is ignored.");
+
+    kafka_config.set("sasl.kerberos.kinit.cmd", "");
+    kafka_config.set("sasl.kerberos.min.time.before.relogin", "0");
+
+    if (kafka_config.has_property("sasl.kerberos.keytab") && 
kafka_config.has_property("sasl.kerberos.principal"))
+    {
+        String keytab = kafka_config.get("sasl.kerberos.keytab");
+        String principal = kafka_config.get("sasl.kerberos.principal");
+        LOG_DEBUG(params.log, "Running KerberosInit");
+        try
+        {
+            kerberosInit(keytab, principal);
+        }
+        catch (const Exception & e)
+        {
+            LOG_ERROR(params.log, "KerberosInit failure: {}", 
getExceptionMessage(e, false));
+        }
+        LOG_DEBUG(params.log, "Finished KerberosInit");
+    }
+#else // USE_KRB5
+    if (kafka_config.has_property("sasl.kerberos.keytab") || 
kafka_config.has_property("sasl.kerberos.principal"))
+        LOG_WARNING(params.log, "Ignoring Kerberos-related parameters because 
ClickHouse was built without krb5 library support.");
+#endif // USE_KRB5
+    // No need to add any prefix, messages can be distinguished
+    kafka_config.set_log_callback(
+        [log = params.log](cppkafka::KafkaHandleBase & handle, int level, 
const std::string & facility, const std::string & message)
+        {
+            auto [poco_level, client_logs_level] = parseSyslogLevel(level);
+            const auto & kafka_object_config = handle.get_configuration();
+            const std::string client_id_key{"client.id"};
+            chassert(kafka_object_config.has_property(client_id_key) && "Kafka 
configuration doesn't have expected client.id set");
+            LOG_IMPL(
+                log,
+                client_logs_level,
+                poco_level,
+                "[client.id:{}] [rdk:{}] {}",
+                kafka_object_config.get(client_id_key),
+                facility,
+                message);
+        });
+
+    /// NOTE: statistics should be consumed, otherwise it creates too much
+    /// entries in the queue, that leads to memory leak and slow shutdown.
+    if (!kafka_config.has_property("statistics.interval.ms"))
+    {
+        // every 3 seconds by default. set to 0 to disable.
+        kafka_config.set("statistics.interval.ms", "3000");
+    }
+}
+
+
+void GlutenKafkaUtils::loadLegacyTopicConfig(
+    cppkafka::Configuration & kafka_config,
+    const Poco::Util::AbstractConfiguration & config,
+    const String & collection_name,
+    const String & config_prefix)
+{
+    if (!collection_name.empty())
+    {
+        loadNamedCollectionConfig(kafka_config, collection_name, 
config_prefix);
+        return;
+    }
+
+    Poco::Util::AbstractConfiguration::Keys tags;
+    config.keys(config_prefix, tags);
+
+    for (const auto & tag : tags)
+        loadConfigProperty(kafka_config, config, config_prefix, tag);
+}
+
+void GlutenKafkaUtils::loadLegacyConfigSyntax(
+    cppkafka::Configuration & kafka_config,
+    const Poco::Util::AbstractConfiguration & config,
+    const String & collection_name,
+    const Names & topics)
+{
+    for (const auto & topic : topics)
+    {
+        const String kafka_topic_path = KafkaConfigLoader::CONFIG_KAFKA_TAG + 
"." + KafkaConfigLoader::CONFIG_KAFKA_TAG + "_" + topic;
+        loadLegacyTopicConfig(kafka_config, config, collection_name, 
kafka_topic_path);
+    }
+}
+
+
+void GlutenKafkaUtils::loadConsumerConfig(cppkafka::Configuration & 
kafka_config, const KafkaConfigLoader::LoadConfigParams & params)
+{
+    const String consumer_path = KafkaConfigLoader::CONFIG_KAFKA_TAG + "." + 
KafkaConfigLoader::CONFIG_KAFKA_CONSUMER_TAG;
+    loadLegacyConfigSyntax(kafka_config, params.config, 
params.collection_name, params.topics);
+    // A new syntax has higher priority
+    loadFromConfig(kafka_config, params, consumer_path);
+}
+
+
+cppkafka::Configuration GlutenKafkaUtils::getConsumerConfiguration(const 
KafkaConfigLoader::ConsumerConfigParams & params)
+{
+    cppkafka::Configuration conf;
+
+    conf.set("metadata.broker.list", params.brokers);
+    conf.set("group.id", params.group);
+    if (params.multiple_consumers)
+        conf.set("client.id", fmt::format("{}-{}", params.client_id, 
params.consumer_number));
+    else
+        conf.set("client.id", params.client_id);
+    conf.set("client.software.name", VERSION_NAME);
+    conf.set("client.software.version", VERSION_DESCRIBE);
+    conf.set("auto.offset.reset", "earliest"); // If no offset stored for this 
group, read all messages from the start
+
+    // that allows to prevent fast draining of the librdkafka queue
+    // during building of single insert block. Improves performance
+    // significantly, but may lead to bigger memory consumption.
+    size_t default_queued_min_messages = 100000; // must be greater than or 
equal to default
+    size_t max_allowed_queued_min_messages = 10000000; // must be less than or 
equal to max allowed value
+    conf.set(
+        "queued.min.messages", std::min(std::max(params.max_block_size, 
default_queued_min_messages), max_allowed_queued_min_messages));
+
+    updateGlobalConfiguration(conf, params);
+    loadConsumerConfig(conf, params);
+
+    // those settings should not be changed by users.
+    conf.set("enable.auto.commit", "false"); // We manually commit offsets 
after a stream successfully finished
+    conf.set("enable.auto.offset.store", "false"); // Update offset 
automatically - to commit them all at once.
+    conf.set("enable.partition.eof", "false"); // Ignore EOF messages
+
+    for (auto & property : conf.get_all())
+        LOG_TRACE(params.log, "Consumer set property {}:{}", property.first, 
property.second);
+
+    return conf;
+}
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h 
b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h
new file mode 100644
index 0000000000..517341c259
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/GlutenKafkaUtils.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <Access/KerberosInit.h>
+#include <Storages/Kafka/KafkaConfigLoader.h>
+#include <cppkafka/configuration.h>
+#include <Poco/Util/AbstractConfiguration.h>
+
+namespace local_engine
+{
+using namespace DB;
+
+class GlutenKafkaUtils
+{
+public:
+    static void setKafkaConfigValue(cppkafka::Configuration & kafka_config, 
const String & key, const String & value);
+
+    static void
+    loadNamedCollectionConfig(cppkafka::Configuration & kafka_config, const 
String & collection_name, const String & config_prefix);
+
+    static void loadConfigProperty(
+        cppkafka::Configuration & kafka_config,
+        const Poco::Util::AbstractConfiguration & config,
+        const String & config_prefix,
+        const String & tag);
+
+    static void loadTopicConfig(
+        cppkafka::Configuration & kafka_config,
+        const Poco::Util::AbstractConfiguration & config,
+        const String & collection_name,
+        const String & config_prefix,
+        const String & topic);
+
+    static void loadFromConfig(
+        cppkafka::Configuration & kafka_config, const 
KafkaConfigLoader::LoadConfigParams & params, const String & config_prefix);
+
+    static void updateGlobalConfiguration(cppkafka::Configuration & 
kafka_config, const KafkaConfigLoader::LoadConfigParams & params);
+
+    static void loadLegacyTopicConfig(
+        cppkafka::Configuration & kafka_config,
+        const Poco::Util::AbstractConfiguration & config,
+        const String & collection_name,
+        const String & config_prefix);
+
+    static void loadLegacyConfigSyntax(
+        cppkafka::Configuration & kafka_config,
+        const Poco::Util::AbstractConfiguration & config,
+        const String & collection_name,
+        const Names & topics);
+
+    static void loadConsumerConfig(cppkafka::Configuration & kafka_config, 
const KafkaConfigLoader::LoadConfigParams & params);
+
+
+    static cppkafka::Configuration getConsumerConfiguration(const 
KafkaConfigLoader::ConsumerConfigParams & params);
+};
+
+
+}
diff --git a/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp 
b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp
new file mode 100644
index 0000000000..7e304a1894
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.cpp
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "GlutenKafkaSource.h"
+
+#include <Core/Settings.h>
+#include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Storages/Kafka/ReadFromGlutenStorageKafka.h>
+
+
+namespace DB
+{
+namespace Setting
+{
+extern const SettingsBool stream_like_engine_allow_direct_select;
+}
+}
+
+
+namespace local_engine
+{
+using namespace DB;
+
+ReadFromGlutenStorageKafka::ReadFromGlutenStorageKafka(
+    const Names & column_names_,
+    Header output_header_,
+    // std::shared_ptr<const StorageLimitsList> storage_limits_,
+    ContextPtr context_,
+    Names & topics,
+    size_t partition,
+    size_t start_offset,
+    size_t end_offset,
+    size_t poll_timeout_ms,
+    String group_id,
+    String brokers)
+    : ISourceStep{output_header_}
+    , WithContext{context_}
+    ,
+    // storage_limits{std::move(storage_limits_)},
+    column_names(column_names_)
+    , output_header(output_header_)
+    , topics(topics)
+    , partition(partition)
+    , start_offset(start_offset)
+    , end_offset(end_offset)
+    , poll_timeout_ms(poll_timeout_ms)
+    , group_id(group_id)
+    , brokers(brokers)
+{
+}
+
+void ReadFromGlutenStorageKafka::initializePipeline(QueryPipelineBuilder & 
pipeline, const BuildQueryPipelineSettings &)
+{
+    auto pipe = makePipe();
+
+    /// Add storage limits.
+    // for (const auto & processor : pipe.getProcessors())
+    //     processor->setStorageLimits(storage_limits);
+
+    /// Add to processors to get processor info through explain pipeline 
statement.
+    for (const auto & processor : pipe.getProcessors())
+        processors.emplace_back(processor);
+
+    pipeline.init(std::move(pipe));
+}
+
+Pipe ReadFromGlutenStorageKafka::makePipe()
+{
+    auto kafka_settings = createKafkaSettings();
+    Pipes pipes;
+    pipes.reserve(1);
+    auto modified_context = Context::createCopy(getContext());
+    // 
modified_context->applySettingsChanges(kafka_storage.settings_adjustments);
+    pipes.emplace_back(std::make_shared<GlutenKafkaSource>(
+        output_header, modified_context, topics, partition, brokers, group_id, 
poll_timeout_ms, start_offset, end_offset, kafka_settings));
+
+    // LOG_DEBUG(kafka_storage.log, "Starting reading kafka batch stream");
+    return Pipe::unitePipes(std::move(pipes));
+}
+
+std::shared_ptr<KafkaSettings> 
ReadFromGlutenStorageKafka::createKafkaSettings()
+{
+    // TODO: add more configuration
+    return std::make_shared<DB::KafkaSettings>();
+}
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h 
b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h
new file mode 100644
index 0000000000..820189a101
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Kafka/ReadFromGlutenStorageKafka.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <Processors/IProcessor.h>
+#include <Processors/QueryPlan/ISourceStep.h>
+#include <Storages/Kafka/KafkaSettings.h>
+
+namespace local_engine
+{
+using namespace DB;
+class ReadFromGlutenStorageKafka : public ISourceStep, protected WithContext
+{
+public:
+    ReadFromGlutenStorageKafka(
+        const Names & column_names_,
+        Header output_header_,
+        ContextPtr context_,
+        Names & topics,
+        size_t partition,
+        size_t start_offset,
+        size_t end_offset,
+        size_t poll_timeout_ms,
+        String group_id,
+        String brokers);
+
+    String getName() const override { return "ReadFromGlutenStorageKafka"; }
+
+    void initializePipeline(QueryPipelineBuilder & pipeline, const 
BuildQueryPipelineSettings & /*settings*/) final;
+
+private:
+    Pipe makePipe();
+    std::shared_ptr<KafkaSettings> createKafkaSettings();
+
+protected:
+    // std::shared_ptr<const StorageLimitsList> storage_limits;
+    const Names & column_names;
+    Header output_header;
+
+    Names topics;
+    size_t partition;
+    size_t start_offset;
+    size_t end_offset;
+    size_t poll_timeout_ms;
+    String group_id;
+    String brokers;
+};
+
+
+}
diff --git a/docs/developers/SubstraitModifications.md 
b/docs/developers/SubstraitModifications.md
index 3db2b5869c..8ca2ab4f10 100644
--- a/docs/developers/SubstraitModifications.md
+++ b/docs/developers/SubstraitModifications.md
@@ -29,6 +29,7 @@ changed `Unbounded` in `WindowFunction` into 
`Unbounded_Preceding` and `Unbounde
 * Added `TopNRel` 
([#5409](https://github.com/apache/incubator-gluten/pull/5409)).
 * Added `ref` field in window bound `Preceding` and `Following` 
([#5626](https://github.com/apache/incubator-gluten/pull/5626)).
 * Added `BucketSpec` field in 
`WriteRel`([#8386](https://github.com/apache/incubator-gluten/pull/8386))
+* Added `StreamKafka` in 
`ReadRel`([#8321](https://github.com/apache/incubator-gluten/pull/8321))
 
 ## Modifications to type.proto
 
diff --git a/gluten-kafka/pom.xml b/gluten-kafka/pom.xml
new file mode 100644
index 0000000000..ff633728a8
--- /dev/null
+++ b/gluten-kafka/pom.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>gluten-parent</artifactId>
+        <groupId>org.apache.gluten</groupId>
+        <version>1.4.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>gluten-kafka</artifactId>
+    <packaging>jar</packaging>
+    <name>Gluten Kafka</name>
+
+    <properties>
+        <resource.dir>${project.basedir}/src/main/resources</resource.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.gluten</groupId>
+            <artifactId>gluten-substrait</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- For test -->
+        <dependency>
+            <groupId>org.apache.gluten</groupId>
+            <artifactId>gluten-substrait</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+        
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+        <resources>
+            <resource>
+                <directory>${resource.dir}</directory>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.scalastyle</groupId>
+                <artifactId>scalastyle-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <configuration>
+                    <junitxml>.</junitxml>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>prepare-test-jar</id>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-resources-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/gluten-kafka/src-kafka/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java
 
b/gluten-kafka/src-kafka/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java
new file mode 100644
index 0000000000..c0cfe8866f
--- /dev/null
+++ 
b/gluten-kafka/src-kafka/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceBuilder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.substrait.rel;
+
+import java.util.Map;
+
+public class StreamKafkaSourceBuilder {
+    public static StreamKafkaSourceNode makeStreamKafkaBatch(
+            String topic,
+            Integer partition,
+            Long startOffset,
+            Long endOffset,
+            Long pollTimeoutMs,
+            Boolean failOnDataLoss,
+            Boolean includeHeaders,
+            Map<String, Object> kafkaParams) {
+        return new StreamKafkaSourceNode(
+                topic,
+                partition,
+                startOffset,
+                endOffset,
+                pollTimeoutMs,
+                failOnDataLoss,
+                includeHeaders,
+                kafkaParams);
+    }
+}
diff --git 
a/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
 
b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
new file mode 100644
index 0000000000..d3a581ae30
--- /dev/null
+++ 
b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.gluten.substrait.rel.{ReadRelNode, SplitInfo}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory, Scan}
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset}
+import org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec
+import org.apache.spark.sql.kafka010.GlutenStreamKafkaSourceUtil
+import org.apache.spark.sql.types.StructType
+
+/** Physical plan node for scanning a micro-batch of data from a data source. 
*/
+case class MicroBatchScanExecTransformer(
+    override val output: Seq[AttributeReference],
+    @transient override val scan: Scan,
+    @transient stream: MicroBatchStream,
+    @transient start: Offset,
+    @transient end: Offset,
+    @transient override val table: Table,
+    override val keyGroupedPartitioning: Option[Seq[Expression]] = None,
+    override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None
+) extends BatchScanExecTransformerBase(
+    output = output,
+    scan = scan,
+    runtimeFilters = Seq.empty,
+    table = table,
+    keyGroupedPartitioning = keyGroupedPartitioning,
+    commonPartitionValues = commonPartitionValues
+  ) {
+
+  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
+  override def equals(other: Any): Boolean = other match {
+    case other: MicroBatchScanExecTransformer => this.stream == other.stream
+    case _ => false
+  }
+
+  override lazy val readerFactory: PartitionReaderFactory = 
stream.createReaderFactory()
+
+  @transient override lazy val inputPartitionsShim: Seq[InputPartition] =
+    stream.planInputPartitions(start, end)
+
+  override def filterExprs(): Seq[Expression] = Seq.empty
+
+  override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
+
+  override def outputAttributes(): Seq[Attribute] = output
+
+  override def getPartitions: Seq[InputPartition] = inputPartitionsShim
+
+  /** Returns the actual schema of this data source scan. */
+  override def getDataSchema: StructType = scan.readSchema()
+
+  override def nodeName: String = 
s"MicroBatchScanExecTransformer(${scan.description()})"
+
+  override lazy val fileFormat: ReadFileFormat = 
GlutenStreamKafkaSourceUtil.getFileFormat(scan)
+
+  protected[this] def supportsBatchScan(scan: Scan): Boolean = {
+    MicroBatchScanExecTransformer.supportsBatchScan(scan)
+  }
+
+  override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): 
Seq[SplitInfo] = {
+    val groupedPartitions = filteredPartitions.flatten
+    groupedPartitions.zipWithIndex.map {
+      case (p, _) => GlutenStreamKafkaSourceUtil.genSplitInfo(p)
+    }
+  }
+
+  override protected def doTransform(context: SubstraitContext): 
TransformContext = {
+    val ctx = super.doTransform(context)
+    ctx.root.asInstanceOf[ReadRelNode].setStreamKafka(true);
+    ctx
+  }
+}
+
+object MicroBatchScanExecTransformer {
+  def apply(batch: MicroBatchScanExec): MicroBatchScanExecTransformer = {
+    val output = 
batch.output.filter(_.isInstanceOf[AttributeReference]).map(_.asInstanceOf[AttributeReference]).toSeq
+    if (output.size == batch.output.size) {
+      new MicroBatchScanExecTransformer(
+        output,
+        batch.scan,
+        batch.stream,
+        batch.start,
+        batch.end,
+        null,
+        Option.empty,
+        Option.empty)
+    } else {
+      throw new UnsupportedOperationException(
+        s"Unsupported DataSourceV2ScanExecBase: 
${batch.output.getClass.getName}")
+    }
+  }
+
+  def supportsBatchScan(scan: Scan): Boolean = {
+    scan.getClass.getName == 
"org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan"
+  }
+}
diff --git 
a/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
 
b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
new file mode 100644
index 0000000000..5d670e0723
--- /dev/null
+++ 
b/gluten-kafka/src-kafka/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.extension.columnar.enumerated.RasOffload
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+import org.apache.gluten.extension.columnar.validator.Validators
+import org.apache.gluten.extension.injector.Injector
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
MicroBatchScanExec}
+
+case class OffloadKafkaScan() extends OffloadSingleNode {
+  override def offload(plan: SparkPlan): SparkPlan = plan match {
+    case scan: MicroBatchScanExec if 
MicroBatchScanExecTransformer.supportsBatchScan(scan.scan) =>
+      MicroBatchScanExecTransformer(scan)
+    case other => other
+  }
+}
+
+object OffloadKafkaScan {
+  def inject(injector: Injector): Unit = {
+    // Inject legacy rule.
+    injector.gluten.legacy.injectTransform {
+      c =>
+        val offload = Seq(OffloadKafkaScan())
+        HeuristicTransform.Simple(
+          Validators.newValidator(c.glutenConf, offload),
+          offload
+        )
+    }
+
+    // Inject RAS rule.
+    injector.gluten.ras.injectRasRule {
+      c =>
+        RasOffload.Rule(
+          RasOffload.from[BatchScanExec](OffloadKafkaScan()),
+          Validators.newValidator(c.glutenConf),
+          Nil)
+    }
+  }
+}
diff --git 
a/gluten-kafka/src-kafka/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala
 
b/gluten-kafka/src-kafka/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala
new file mode 100644
index 0000000000..d5468f2cde
--- /dev/null
+++ 
b/gluten-kafka/src-kafka/main/scala/org/apache/spark/sql/kafka010/GlutenStreamKafkaSourceUtil.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.kafka010
+
+import org.apache.gluten.exception.GlutenNotSupportException
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.gluten.substrait.rel.{SplitInfo, StreamKafkaSourceBuilder}
+import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+
+object GlutenStreamKafkaSourceUtil {
+  def genSplitInfo(
+      inputPartition: InputPartition): SplitInfo = inputPartition match {
+    case batch: KafkaBatchInputPartition =>
+      StreamKafkaSourceBuilder.makeStreamKafkaBatch(
+        batch.offsetRange.topicPartition.topic(),
+        batch.offsetRange.topicPartition.partition(),
+        batch.offsetRange.fromOffset,
+        batch.offsetRange.untilOffset,
+        batch.pollTimeoutMs,
+        batch.failOnDataLoss,
+        batch.includeHeaders,
+        batch.executorKafkaParams
+      )
+    case _ =>
+      throw new UnsupportedOperationException("Only support kafka 
KafkaBatchInputPartition.")
+  }
+
+  def getFileFormat(scan: Scan): ReadFileFormat = scan.getClass.getSimpleName 
match {
+    case "KafkaScan" => ReadFileFormat.KafkaReadFormat
+    case _ =>
+      throw new GlutenNotSupportException("Only support KafkaScan.")
+  }
+
+}
diff --git 
a/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala
 
b/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala
new file mode 100644
index 0000000000..0855e03192
--- /dev/null
+++ 
b/gluten-kafka/src-kafka/test/scala/org/apache/gluten/execution/kafka/GlutenKafkaScanSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.kafka
+
+import org.apache.gluten.execution.{MicroBatchScanExecTransformer, 
WholeStageTransformerSuite}
+import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
+import org.apache.spark.sql.functions.{col, split}
+import org.apache.spark.sql.streaming.Trigger
+
+import scala.concurrent.duration.DurationInt
+
+trait GlutenKafkaScanSuite extends WholeStageTransformerSuite {
+  protected val kafkaBootstrapServers: String
+
+  test("test MicroBatchScanExecTransformer not fallback") {
+    withTempDir(
+      dir => {
+        val table_name = "kafka_table"
+        withTable(s"$table_name") {
+          val df = spark.readStream
+            .format("kafka")
+            .option("kafka.bootstrap.servers", kafkaBootstrapServers)
+            .option("subscribe", table_name)
+            .load()
+            .withColumn("sp", split(col("value").cast("string"), ","))
+            .withColumn("id", col("sp").getItem(0).cast("int"))
+            .withColumn("name", col("sp").getItem(1).cast("string"))
+            .drop(col("sp"))
+            .drop(col("value"))
+            .drop(col("key"))
+            .drop(col("topic"))
+            .drop(col("partition"))
+            .drop(col("offset"))
+            .drop(col("timestamp"))
+            .drop(col("timestampType"))
+
+          val streamQuery = df.writeStream
+            .format("parquet")
+            .option("checkpointLocation", dir.getCanonicalPath + 
"/_checkpoint")
+            .trigger(Trigger.ProcessingTime("5 seconds"))
+            .start(dir.getCanonicalPath)
+
+          spark.sql(s"""
+                       |CREATE EXTERNAL TABLE $table_name (
+                       |    id long,
+                       |    name string
+                       |)USING Parquet
+                       |LOCATION '${dir.getCanonicalPath}'
+                       |""".stripMargin)
+
+          spark
+            .range(0, 20)
+            .selectExpr(
+              "concat(id,',', id) as value"
+            )
+            .write
+            .format("kafka")
+            .option("kafka.bootstrap.servers", kafkaBootstrapServers)
+            .option("topic", table_name)
+            .save()
+
+          eventually(timeout(60.seconds), interval(5.seconds)) {
+            val size = streamQuery
+              .asInstanceOf[StreamingQueryWrapper]
+              .streamingQuery
+              .lastExecution
+              .executedPlan
+              .collect { case p: MicroBatchScanExecTransformer => p }
+            assert(size.size == 1)
+            streamQuery.awaitTermination(1000)
+          }
+        }
+      })
+  }
+}
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index 9638e43cd7..bebc6b1e95 100644
--- 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -49,6 +49,7 @@ public class LocalFilesNode implements SplitInfo {
     MergeTreeReadFormat(),
     TextReadFormat(),
     JsonReadFormat(),
+    KafkaReadFormat(),
     UnknownFormat()
   }
 
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java
index fba59af57b..b82e05fd36 100644
--- 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ReadRelNode.java
@@ -37,6 +37,7 @@ public class ReadRelNode implements RelNode, Serializable {
   private final List<ColumnTypeNode> columnTypeNodes = new ArrayList<>();
   private final ExpressionNode filterNode;
   private final AdvancedExtensionNode extensionNode;
+  private boolean streamKafka = false;
 
   ReadRelNode(
       List<TypeNode> types,
@@ -51,6 +52,10 @@ public class ReadRelNode implements RelNode, Serializable {
     this.extensionNode = extensionNode;
   }
 
+  public void setStreamKafka(boolean streamKafka) {
+    this.streamKafka = streamKafka;
+  }
+
   @Override
   public Rel toProtobuf() {
     RelCommon.Builder relCommonBuilder = RelCommon.newBuilder();
@@ -62,6 +67,7 @@ public class ReadRelNode implements RelNode, Serializable {
     ReadRel.Builder readBuilder = ReadRel.newBuilder();
     readBuilder.setCommon(relCommonBuilder.build());
     readBuilder.setBaseSchema(nStructBuilder.build());
+    readBuilder.setStreamKafka(streamKafka);
 
     if (filterNode != null) {
       readBuilder.setFilter(filterNode.toProtobuf());
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceNode.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceNode.java
new file mode 100644
index 0000000000..5974e3bb0f
--- /dev/null
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/StreamKafkaSourceNode.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.substrait.rel;
+
+import io.substrait.proto.ReadRel;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class StreamKafkaSourceNode implements SplitInfo {
+
+  private final String topic;
+  private final Integer partition;
+  private final Long startOffset;
+  private final Long endOffset;
+  private final Long pollTimeoutMs;
+  private final Boolean failOnDataLoss;
+  private final Boolean includeHeaders;
+
+  private final Map<String, Object> kafkaParams;
+
+  public StreamKafkaSourceNode(
+      String topic,
+      Integer partition,
+      Long startOffset,
+      Long endOffset,
+      Long pollTimeoutMs,
+      Boolean failOnDataLoss,
+      Boolean includeHeaders,
+      Map<String, Object> kafkaParams) {
+    this.topic = topic;
+    this.partition = partition;
+    this.startOffset = startOffset;
+    this.endOffset = endOffset;
+    this.pollTimeoutMs = pollTimeoutMs;
+    this.failOnDataLoss = failOnDataLoss;
+    this.includeHeaders = includeHeaders;
+    this.kafkaParams = kafkaParams;
+  }
+
+  @Override
+  public List<String> preferredLocations() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ReadRel.StreamKafka toProtobuf() {
+    ReadRel.StreamKafka.Builder builder = ReadRel.StreamKafka.newBuilder();
+
+    ReadRel.StreamKafka.TopicPartition.Builder topicPartition =
+        ReadRel.StreamKafka.TopicPartition.newBuilder();
+    topicPartition.setTopic(topic);
+    topicPartition.setPartition(partition);
+
+    builder.setTopicPartition(topicPartition.build());
+    builder.setStartOffset(startOffset);
+    builder.setEndOffset(endOffset);
+    builder.setPollTimeoutMs(pollTimeoutMs);
+    builder.setFailOnDataLoss(failOnDataLoss);
+    builder.setIncludeHeaders(includeHeaders);
+    kafkaParams.forEach((k, v) -> builder.putParams(k, v.toString()));
+
+    return builder.build();
+  }
+}
diff --git 
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto 
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
index ca669c7639..d53a9aef9d 100644
--- 
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
+++ 
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -69,6 +69,7 @@ message ReadRel {
     LocalFiles local_files = 6;
     NamedTable named_table = 7;
     ExtensionTable extension_table = 8;
+    bool stream_kafka = 9;
   }
 
   // A base table. The list of string is used to represent namespacing (e.g., 
mydb.mytable).
@@ -89,6 +90,22 @@ message ReadRel {
     google.protobuf.Any detail = 1;
   }
 
+  // Used to KafkaBatch or KafkaContinuous source
+  message StreamKafka {
+    message TopicPartition {
+      string topic = 1;
+      int32 partition = 2;
+    }
+
+    TopicPartition topic_partition = 1;
+    int64 start_offset = 2;
+    int64 end_offset = 3;
+    map<string, string> params = 4;
+    int64 poll_timeout_ms = 5;
+    bool  fail_on_data_loss = 6;
+    bool include_headers = 7;
+  }
+
   // Represents a list of files in input of a scan operation
   message LocalFiles {
     repeated FileOrFiles items = 1;
diff --git a/pom.xml b/pom.xml
index 21c5ac3669..d103d25dc5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -804,6 +804,79 @@
         <backend_type>velox</backend_type>
       </properties>
     </profile>
+    <profile>
+      <id>kafka</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <modules>
+        <module>gluten-kafka</module>
+      </modules>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-kafka-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${project.basedir}/src-kafka/main/scala</source>
+                    <source>${project.basedir}/src-kafka/main/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-kafka-resources</id>
+                <phase>generate-resources</phase>
+                <goals>
+                  <goal>add-resource</goal>
+                </goals>
+                <configuration>
+                  <resources>
+                    <resource>
+                      
<directory>${project.basedir}/src-kafka/main/resources</directory>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-kafka-test-sources</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${project.basedir}/src-kafka/test/scala</source>
+                    <source>${project.basedir}/src-kafka/test/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-kafka-test-resources</id>
+                <phase>generate-test-resources</phase>
+                <goals>
+                  <goal>add-test-resource</goal>
+                </goals>
+                <configuration>
+                  <resources>
+                    <resource>
+                      
<directory>${project.basedir}/src-kafka/test/resources</directory>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
     <profile>
       <id>spark-ut</id>
       <modules>
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
index 6b495105d9..001cd9a582 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -46,7 +46,10 @@ abstract class AbstractBatchScanExec(
 
   override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
 
-  @transient override lazy val partitions: Seq[InputPartition] = 
batch.planInputPartitions()
+  @transient override lazy val partitions: Seq[InputPartition] = 
inputPartitionsShim
+
+  @transient protected lazy val inputPartitionsShim: Seq[InputPartition] =
+    batch.planInputPartitions()
 
   @transient private lazy val filteredPartitions: Seq[InputPartition] = {
     val dataSourceFilters = runtimeFilters.flatMap {
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
index f0ee5c1ab7..b12a257a0c 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -47,7 +47,10 @@ abstract class AbstractBatchScanExec(
 
   override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
 
-  @transient override lazy val inputPartitions: Seq[InputPartition] = 
batch.planInputPartitions()
+  @transient override lazy val inputPartitions: Seq[InputPartition] = 
inputPartitionsShim
+
+  @transient protected lazy val inputPartitionsShim: Seq[InputPartition] =
+    batch.planInputPartitions()
 
   @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
     val dataSourceFilters = runtimeFilters.flatMap {
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
index 3313c3c768..aeaf5ccf7a 100644
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -57,7 +57,10 @@ abstract class AbstractBatchScanExec(
 
   override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
 
-  @transient override lazy val inputPartitions: Seq[InputPartition] = 
batch.planInputPartitions()
+  @transient override lazy val inputPartitions: Seq[InputPartition] = 
inputPartitionsShim
+
+  @transient protected lazy val inputPartitionsShim: Seq[InputPartition] =
+    batch.planInputPartitions()
 
   @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
     val dataSourceFilters = runtimeFilters.flatMap {
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
index 8f51ea2c72..3508711124 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala
@@ -52,7 +52,10 @@ abstract class AbstractBatchScanExec(
 
   override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters)
 
-  @transient override lazy val inputPartitions: Seq[InputPartition] = 
batch.planInputPartitions()
+  @transient override lazy val inputPartitions: Seq[InputPartition] = 
inputPartitionsShim
+
+  @transient protected lazy val inputPartitionsShim: Seq[InputPartition] =
+    batch.planInputPartitions()
 
   @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
     val dataSourceFilters = runtimeFilters.flatMap {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to