This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 400577a14 [spark] Add union read for lake-enabled log tables (#2956)
400577a14 is described below

commit 400577a14d4291864145bffa2581fedc80dcbffb
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Apr 8 12:16:35 2026 +0100

    [spark] Add union read for lake-enabled log tables (#2956)
---
 .../scala/org/apache/fluss/spark/SparkTable.scala  |  15 +-
 .../fluss/spark/read/FlussInputPartition.scala     |  18 ++
 .../apache/fluss/spark/read/FlussLakeBatch.scala   | 312 +++++++++++++++++++++
 .../spark/read/FlussLakePartitionReader.scala      |  83 ++++++
 .../read/FlussLakePartitionReaderFactory.scala     |  57 ++++
 .../fluss/spark/read/FlussLakeSourceUtils.scala    |  49 ++++
 .../org/apache/fluss/spark/read/FlussScan.scala    |  24 ++
 .../apache/fluss/spark/read/FlussScanBuilder.scala |  13 +
 fluss-spark/fluss-spark-ut/pom.xml                 |  42 ++-
 ...apache.fluss.lake.lakestorage.LakeStoragePlugin |   3 +-
 ...ge.LakeStoragePlugin => log4j2-test.properties} |  11 +-
 .../lake/SparkLakeIcebergLogTableReadTest.scala    |  45 +++
 .../spark/lake/SparkLakeLogTableReadTestBase.scala | 253 +++++++++++++++++
 .../lake/SparkLakePaimonLogTableReadTest.scala     |  46 +++
 fluss-spark/pom.xml                                |   4 +
 15 files changed, 964 insertions(+), 11 deletions(-)

diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 144db03ae..c2ad05081 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -21,7 +21,7 @@ import org.apache.fluss.client.admin.Admin
 import org.apache.fluss.config.{Configuration => FlussConfiguration}
 import org.apache.fluss.metadata.{TableInfo, TablePath}
 import org.apache.fluss.spark.catalog.{AbstractSparkTable, 
SupportsFlussPartitionManagement}
-import org.apache.fluss.spark.read.{FlussAppendScanBuilder, 
FlussUpsertScanBuilder}
+import org.apache.fluss.spark.read.{FlussAppendScanBuilder, 
FlussLakeAppendScanBuilder, FlussUpsertScanBuilder}
 import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, 
FlussUpsertWriteBuilder}
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
@@ -61,8 +61,19 @@ class SparkTable(
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
     populateSparkConf(flussConfig)
+    val isDataLakeEnabled = tableInfo.getTableConfig.isDataLakeEnabled
+    val startupMode = options
+      .getOrDefault(
+        SparkFlussConf.SCAN_START_UP_MODE.key(),
+        flussConfig.get(SparkFlussConf.SCAN_START_UP_MODE))
+      .toUpperCase
+    val isFullMode = startupMode == SparkFlussConf.StartUpMode.FULL.toString
     if (tableInfo.getPrimaryKeys.isEmpty) {
-      new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
+      if (isDataLakeEnabled && isFullMode) {
+        new FlussLakeAppendScanBuilder(tablePath, tableInfo, options, 
flussConfig)
+      } else {
+        new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
+      }
     } else {
       new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
     }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
index 397b62430..ccf89cc87 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
@@ -45,6 +45,24 @@ case class FlussAppendInputPartition(tableBucket: 
TableBucket, startOffset: Long
   }
 }
 
+/**
+ * Represents an input partition for reading data from a single lake split. 
Each lake split maps to
+ * one Spark task, enabling parallel lake reads across splits.
+ *
+ * @param tableBucket
+ *   the table bucket this split belongs to
+ * @param lakeSplitBytes
+ *   serialized lake split data
+ */
+case class FlussLakeInputPartition(tableBucket: TableBucket, lakeSplitBytes: 
Array[Byte])
+  extends FlussInputPartition {
+  override def toString: String = {
+    s"FlussLakeInputPartition{tableId=${tableBucket.getTableId}, 
bucketId=${tableBucket.getBucket}," +
+      s" partitionId=${tableBucket.getPartitionId}," +
+      s" splitSize=${lakeSplitBytes.length}}"
+  }
+}
+
 /**
  * Represents an input partition for reading data from a primary key table 
bucket. This partition
  * includes snapshot information for hybrid snapshot-log reading.
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala
new file mode 100644
index 000000000..70d5ef45c
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, 
OffsetsInitializer}
+import org.apache.fluss.client.table.scanner.log.LogScanner
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, 
TableInfo, TablePath}
+import org.apache.fluss.utils.ExceptionUtils
+
+import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Batch for reading lake-enabled log table (append-only table with 
datalake). */
+class FlussLakeAppendBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+  // Required by FlussBatch but unused — lake snapshot determines start 
offsets.
+  override val startOffsetsInitializer: OffsetsInitializer = 
OffsetsInitializer.earliest()
+
+  override val stoppingOffsetsInitializer: OffsetsInitializer = {
+    FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, 
flussConfig)
+  }
+
+  private lazy val (partitions, isFallback) = doPlan()
+
+  override def planInputPartitions(): Array[InputPartition] = partitions
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    if (isFallback) {
+      new FlussAppendPartitionReaderFactory(tablePath, projection, options, 
flussConfig)
+    } else {
+      new FlussLakeAppendPartitionReaderFactory(
+        tableInfo.getProperties.toMap,
+        tablePath,
+        tableInfo.getRowType,
+        projection,
+        flussConfig)
+    }
+  }
+
+  /**
+   * Plans input partitions for reading. The returned isFallback flag is true 
when no lake snapshot
+   * exists and the plan falls back to pure log reading.
+   */
+  private def doPlan(): (Array[InputPartition], Boolean) = {
+    val lakeSnapshot =
+      try {
+        admin.getReadableLakeSnapshot(tablePath).get()
+      } catch {
+        case e: Exception =>
+          if (
+            ExceptionUtils
+              .stripExecutionException(e)
+              .isInstanceOf[LakeTableSnapshotNotExistException]
+          ) {
+            return (planFallbackPartitions(), true)
+          }
+          throw e
+      }
+
+    val lakeSource = 
FlussLakeSourceUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
+    lakeSource.withProject(FlussLakeSourceUtils.lakeProjection(projection))
+
+    val lakeSplits = lakeSource
+      .createPlanner(new LakeSource.PlannerContext {
+        override def snapshotId(): Long = lakeSnapshot.getSnapshotId
+      })
+      .plan()
+
+    val splitSerializer = lakeSource.getSplitSerializer
+    val tableBucketsOffset = lakeSnapshot.getTableBucketsOffset
+    val buckets = (0 until tableInfo.getNumBuckets).toSeq
+    val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, 
tablePath)
+
+    val partitions = if (tableInfo.isPartitioned) {
+      planPartitionedTable(
+        lakeSplits.asScala,
+        splitSerializer,
+        tableBucketsOffset,
+        buckets,
+        bucketOffsetsRetriever)
+    } else {
+      planNonPartitionedTable(
+        lakeSplits.asScala,
+        splitSerializer,
+        tableBucketsOffset,
+        buckets,
+        bucketOffsetsRetriever)
+    }
+
+    (partitions, false)
+  }
+
+  private def planNonPartitionedTable(
+      lakeSplits: Seq[LakeSplit],
+      splitSerializer: SimpleVersionedSerializer[LakeSplit],
+      tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long],
+      buckets: Seq[Int],
+      bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): 
Array[InputPartition] = {
+    val tableId = tableInfo.getTableId
+
+    val lakePartitions =
+      createLakePartitions(lakeSplits, splitSerializer, tableId, partitionId = 
None)
+
+    val stoppingOffsets =
+      getBucketOffsets(stoppingOffsetsInitializer, null, buckets, 
bucketOffsetsRetriever)
+    val logPartitions = buckets.flatMap {
+      bucketId =>
+        val tableBucket = new TableBucket(tableId, bucketId)
+        createLogTailPartition(tableBucket, tableBucketsOffset, 
stoppingOffsets(bucketId))
+    }
+
+    (lakePartitions ++ logPartitions).toArray
+  }
+
+  private def planPartitionedTable(
+      lakeSplits: Seq[LakeSplit],
+      splitSerializer: SimpleVersionedSerializer[LakeSplit],
+      tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long],
+      buckets: Seq[Int],
+      bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): 
Array[InputPartition] = {
+    val tableId = tableInfo.getTableId
+
+    val flussPartitionIdByName = mutable.LinkedHashMap.empty[String, Long]
+    partitionInfos.asScala.foreach {
+      pi => flussPartitionIdByName(pi.getPartitionName) = pi.getPartitionId
+    }
+
+    val lakeSplitsByPartition = groupLakeSplitsByPartition(lakeSplits)
+    var lakeSplitPartitionId = -1L
+
+    val lakeAndLogPartitions = lakeSplitsByPartition.flatMap {
+      case (partitionName, splits) =>
+        flussPartitionIdByName.remove(partitionName) match {
+          case Some(partitionId) =>
+            // Partition in both lake and Fluss — lake splits + log tail
+            val lakePartitions =
+              createLakePartitions(splits, splitSerializer, tableId, 
Some(partitionId))
+
+            val stoppingOffsets = getBucketOffsets(
+              stoppingOffsetsInitializer,
+              partitionName,
+              buckets,
+              bucketOffsetsRetriever)
+            val logPartitions = buckets.flatMap {
+              bucketId =>
+                val tableBucket = new TableBucket(tableId, partitionId, 
bucketId)
+                createLogTailPartition(tableBucket, tableBucketsOffset, 
stoppingOffsets(bucketId))
+            }
+
+            lakePartitions ++ logPartitions
+
+          case None =>
+            // Partition only in lake (expired in Fluss) — lake splits only
+            val pid = lakeSplitPartitionId
+            lakeSplitPartitionId -= 1
+            createLakePartitions(splits, splitSerializer, tableId, Some(pid))
+        }
+    }.toSeq
+
+    // Partitions only in Fluss (not yet tiered) — log from earliest
+    val flussOnlyPartitions = flussPartitionIdByName.flatMap {
+      case (partitionName, partitionId) =>
+        val stoppingOffsets = getBucketOffsets(
+          stoppingOffsetsInitializer,
+          partitionName,
+          buckets,
+          bucketOffsetsRetriever)
+        buckets.flatMap {
+          bucketId =>
+            val stoppingOffset = stoppingOffsets(bucketId)
+            if (stoppingOffset > 0) {
+              val tableBucket = new TableBucket(tableId, partitionId, bucketId)
+              Some(
+                FlussAppendInputPartition(
+                  tableBucket,
+                  LogScanner.EARLIEST_OFFSET,
+                  stoppingOffset): InputPartition)
+            } else {
+              None
+            }
+        }
+    }.toSeq
+
+    (lakeAndLogPartitions ++ flussOnlyPartitions).toArray
+  }
+
+  private def groupLakeSplitsByPartition(
+      lakeSplits: Seq[LakeSplit]): mutable.LinkedHashMap[String, 
mutable.ArrayBuffer[LakeSplit]] = {
+    val grouped = mutable.LinkedHashMap.empty[String, 
mutable.ArrayBuffer[LakeSplit]]
+    lakeSplits.foreach {
+      split =>
+        val partitionName = if (split.partition() == null || 
split.partition().isEmpty) {
+          ""
+        } else {
+          
split.partition().asScala.mkString(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR)
+        }
+        grouped.getOrElseUpdate(partitionName, mutable.ArrayBuffer.empty) += 
split
+    }
+    grouped
+  }
+
+  private def createLakePartitions(
+      splits: Seq[LakeSplit],
+      splitSerializer: SimpleVersionedSerializer[LakeSplit],
+      tableId: Long,
+      partitionId: Option[Long]): Seq[InputPartition] = {
+    splits.map {
+      split =>
+        val tableBucket = partitionId match {
+          case Some(pid) => new TableBucket(tableId, pid, split.bucket())
+          case None => new TableBucket(tableId, split.bucket())
+        }
+        FlussLakeInputPartition(tableBucket, splitSerializer.serialize(split))
+    }
+  }
+
+  private def createLogTailPartition(
+      tableBucket: TableBucket,
+      tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long],
+      stoppingOffset: Long): Option[InputPartition] = {
+    val snapshotLogOffset = tableBucketsOffset.get(tableBucket)
+    if (snapshotLogOffset != null) {
+      if (snapshotLogOffset.longValue() < stoppingOffset) {
+        Some(FlussAppendInputPartition(tableBucket, 
snapshotLogOffset.longValue(), stoppingOffset))
+      } else {
+        None
+      }
+    } else if (stoppingOffset > 0) {
+      Some(FlussAppendInputPartition(tableBucket, LogScanner.EARLIEST_OFFSET, 
stoppingOffset))
+    } else {
+      None
+    }
+  }
+
+  private def getBucketOffsets(
+      initializer: OffsetsInitializer,
+      partitionName: String,
+      buckets: Seq[Int],
+      bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Map[Int, Long] = {
+    initializer
+      .getBucketOffsets(partitionName, buckets.map(Integer.valueOf).asJava, 
bucketOffsetsRetriever)
+      .asScala
+      .map(e => (e._1.intValue(), Long2long(e._2)))
+      .toMap
+  }
+
+  private def planFallbackPartitions(): Array[InputPartition] = {
+    val fallbackStartInit = 
FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
+    val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, 
tablePath)
+    val buckets = (0 until tableInfo.getNumBuckets).toSeq
+    val tableId = tableInfo.getTableId
+
+    def createPartitions(
+        partitionId: Option[Long],
+        partitionName: String): Array[InputPartition] = {
+      val startOffsets =
+        getBucketOffsets(fallbackStartInit, partitionName, buckets, 
bucketOffsetsRetriever)
+      val stoppingOffsets =
+        getBucketOffsets(stoppingOffsetsInitializer, partitionName, buckets, 
bucketOffsetsRetriever)
+
+      buckets.map {
+        bucketId =>
+          val tableBucket = partitionId match {
+            case Some(pid) => new TableBucket(tableId, pid, bucketId)
+            case None => new TableBucket(tableId, bucketId)
+          }
+          FlussAppendInputPartition(
+            tableBucket,
+            startOffsets(bucketId),
+            stoppingOffsets(bucketId)
+          ): InputPartition
+      }.toArray
+    }
+
+    if (tableInfo.isPartitioned) {
+      partitionInfos.asScala.flatMap {
+        pi => createPartitions(Some(pi.getPartitionId), pi.getPartitionName)
+      }.toArray
+    } else {
+      createPartitions(None, null)
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala
new file mode 100644
index 000000000..9c0031409
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.record.LogRecord
+import org.apache.fluss.spark.row.DataConverter
+import org.apache.fluss.types.RowType
+import org.apache.fluss.utils.CloseableIterator
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.PartitionReader
+
+/** Partition reader that reads data from a single lake split via lake storage 
(no Fluss connection). */
+class FlussLakePartitionReader(
+    tablePath: TablePath,
+    rowType: RowType,
+    partition: FlussLakeInputPartition,
+    lakeSource: LakeSource[LakeSplit])
+  extends PartitionReader[InternalRow]
+  with Logging {
+
+  private var currentRow: InternalRow = _
+  private var closed = false
+  private var recordIterator: CloseableIterator[LogRecord] = _
+
+  initialize()
+
+  private def initialize(): Unit = {
+    logInfo(s"Reading lake split for table $tablePath 
bucket=${partition.tableBucket.getBucket}")
+
+    val splitSerializer = lakeSource.getSplitSerializer
+    val split = splitSerializer.deserialize(splitSerializer.getVersion, 
partition.lakeSplitBytes)
+
+    recordIterator = lakeSource
+      .createRecordReader(new LakeSource.ReaderContext[LakeSplit] {
+        override def lakeSplit(): LakeSplit = split
+      })
+      .read()
+  }
+
+  override def next(): Boolean = {
+    if (closed || recordIterator == null) {
+      return false
+    }
+
+    if (recordIterator.hasNext) {
+      val logRecord = recordIterator.next()
+      currentRow = DataConverter.toSparkInternalRow(logRecord.getRow, rowType)
+      true
+    } else {
+      false
+    }
+  }
+
+  override def get(): InternalRow = currentRow
+
+  override def close(): Unit = {
+    if (!closed) {
+      closed = true
+      if (recordIterator != null) {
+        recordIterator.close()
+      }
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala
new file mode 100644
index 000000000..18c592801
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.types.RowType
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
+
+import java.util
+
+/** Factory for lake-enabled log table reads. Dispatches to lake or log reader 
per partition type. */
+class FlussLakeAppendPartitionReaderFactory(
+    tableProperties: util.Map[String, String],
+    tablePath: TablePath,
+    rowType: RowType,
+    projection: Array[Int],
+    flussConfig: Configuration)
+  extends PartitionReaderFactory {
+
+  @transient private lazy val lakeSource: LakeSource[LakeSplit] = {
+    val source = FlussLakeSourceUtils.createLakeSource(tableProperties, 
tablePath)
+    source.withProject(FlussLakeSourceUtils.lakeProjection(projection))
+    source
+  }
+
+  private lazy val projectedRowType: RowType = rowType.project(projection)
+
+  override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
+    partition match {
+      case lake: FlussLakeInputPartition =>
+        new FlussLakePartitionReader(tablePath, projectedRowType, lake, 
lakeSource)
+      case log: FlussAppendInputPartition =>
+        new FlussAppendPartitionReader(tablePath, projection, log, flussConfig)
+      case _ =>
+        throw new IllegalArgumentException(s"Unexpected partition type: 
${partition.getClass}")
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala
new file mode 100644
index 000000000..41958c344
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.utils.PropertiesUtils
+
+import java.util
+
+/** Shared utilities for creating lake sources and projections. */
+object FlussLakeSourceUtils {
+
+  def createLakeSource(
+      tableProperties: util.Map[String, String],
+      tablePath: TablePath): LakeSource[LakeSplit] = {
+    val tableConfig = Configuration.fromMap(tableProperties)
+    val datalakeFormat = tableConfig.get(ConfigOptions.TABLE_DATALAKE_FORMAT)
+    val dataLakePrefix = "table.datalake." + datalakeFormat + "."
+
+    val catalogProperties = 
PropertiesUtils.extractAndRemovePrefix(tableProperties, dataLakePrefix)
+    val lakeConfig = Configuration.fromMap(catalogProperties)
+    val lakeStoragePlugin =
+      LakeStoragePluginSetUp.fromDataLakeFormat(datalakeFormat.toString, null)
+    val lakeStorage = lakeStoragePlugin.createLakeStorage(lakeConfig)
+    lakeStorage.createLakeSource(tablePath).asInstanceOf[LakeSource[LakeSplit]]
+  }
+
+  def lakeProjection(projection: Array[Int]): Array[Array[Int]] = {
+    projection.map(i => Array(i))
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
index a54396127..d4e14bd47 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
@@ -61,6 +61,30 @@ case class FlussAppendScan(
   }
 }
 
+/** Fluss Lake Append Scan. */
+case class FlussLakeAppendScan(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    requiredSchema: Option[StructType],
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussScan {
+
+  override def toBatch: Batch = {
+    new FlussLakeAppendBatch(tablePath, tableInfo, readSchema, options, 
flussConfig)
+  }
+
+  override def toMicroBatchStream(checkpointLocation: String): 
MicroBatchStream = {
+    new FlussAppendMicroBatchStream(
+      tablePath,
+      tableInfo,
+      readSchema,
+      options,
+      flussConfig,
+      checkpointLocation)
+  }
+}
+
 /** Fluss Upsert Scan. */
 case class FlussUpsertScan(
     tablePath: TablePath,
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
index cd3e6768f..9dd49f4df 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
@@ -47,6 +47,19 @@ class FlussAppendScanBuilder(
   }
 }
 
+/** Fluss Lake Append Scan Builder. */
+class FlussLakeAppendScanBuilder(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    options: CaseInsensitiveStringMap,
+    flussConfig: FlussConfiguration)
+  extends FlussScanBuilder {
+
+  override def build(): Scan = {
+    FlussLakeAppendScan(tablePath, tableInfo, requiredSchema, options, 
flussConfig)
+  }
+}
+
 /** Fluss Upsert Scan Builder. */
 class FlussUpsertScanBuilder(
     tablePath: TablePath,
diff --git a/fluss-spark/fluss-spark-ut/pom.xml 
b/fluss-spark/fluss-spark-ut/pom.xml
index e77338cac..40339a2c4 100644
--- a/fluss-spark/fluss-spark-ut/pom.xml
+++ b/fluss-spark/fluss-spark-ut/pom.xml
@@ -95,6 +95,42 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-lake-iceberg</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Flink tiering job dependencies -->
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-flink-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-flink-1.20</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <!-- for curator TestingServer  -->
         <dependency>
             <groupId>org.apache.curator</groupId>
@@ -127,12 +163,6 @@
             <scope>test</scope>
         </dependency>
 
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>3.8.1</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
 
b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
index 69bf0f8a4..1c752018e 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
+++ 
b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
@@ -16,4 +16,5 @@
 # limitations under the License.
 #
 
-org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin
\ No newline at end of file
+org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin
+org.apache.fluss.lake.iceberg.IcebergLakeStoragePlugin
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
 b/fluss-spark/fluss-spark-ut/src/test/resources/log4j2-test.properties
similarity index 67%
copy from 
fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
copy to fluss-spark/fluss-spark-ut/src/test/resources/log4j2-test.properties
index 69bf0f8a4..edfee0abf 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
+++ b/fluss-spark/fluss-spark-ut/src/test/resources/log4j2-test.properties
@@ -15,5 +15,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin
\ No newline at end of file
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala
new file mode 100644
index 000000000..fad9bcd90
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.DataLakeFormat
+
+import java.nio.file.Files
+
+class SparkLakeIcebergLogTableReadTest extends SparkLakeLogTableReadTestBase {
+
+  override protected def dataLakeFormat: DataLakeFormat = 
DataLakeFormat.ICEBERG
+
+  override protected def flussConf: Configuration = {
+    val conf = super.flussConf
+    conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString)
+    conf.setString("datalake.iceberg.type", "hadoop")
+    warehousePath =
+      
Files.createTempDirectory("fluss-testing-iceberg-lake-read").resolve("warehouse").toString
+    conf.setString("datalake.iceberg.warehouse", warehousePath)
+    conf
+  }
+
+  override protected def lakeCatalogConf: Configuration = {
+    val conf = new Configuration()
+    conf.setString("type", "hadoop")
+    conf.setString("warehouse", warehousePath)
+    conf
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala
new file mode 100644
index 000000000..a23839461
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake
+
+import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.flink.tiering.LakeTieringJobBuilder
+import org.apache.fluss.flink.tiering.source.TieringSourceOptions
+import org.apache.fluss.metadata.{DataLakeFormat, TableBucket}
+import org.apache.fluss.spark.FlussSparkTestBase
+import org.apache.fluss.spark.SparkConnectorOptions.BUCKET_NUMBER
+
+import org.apache.flink.api.common.RuntimeExecutionMode
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.spark.sql.Row
+
+import java.time.Duration
+
+/**
+ * Base class for lake-enabled log table read tests. Subclasses provide the 
lake format config and
+ * lake catalog configuration.
+ */
+abstract class SparkLakeLogTableReadTestBase extends FlussSparkTestBase {
+
+  protected var warehousePath: String = _
+
+  /** The lake format used by this test. */
+  protected def dataLakeFormat: DataLakeFormat
+
+  /** Lake catalog configuration specific to the format. */
+  protected def lakeCatalogConf: Configuration
+
+  private val TIERING_PARALLELISM = 2
+  private val CHECKPOINT_INTERVAL_MS = 1000L
+  private val POLL_INTERVAL: Duration = Duration.ofMillis(500L)
+  private val SYNC_TIMEOUT: Duration = Duration.ofMinutes(2)
+  private val SYNC_POLL_INTERVAL_MS = 500L
+
+  /** Tier all pending data for the given table to the lake. */
+  protected def tierToLake(tableName: String): Unit = {
+    val tableId = 
loadFlussTable(createTablePath(tableName)).getTableInfo.getTableId
+
+    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+    execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
+    execEnv.setParallelism(TIERING_PARALLELISM)
+    execEnv.enableCheckpointing(CHECKPOINT_INTERVAL_MS)
+
+    val flussConfig = new Configuration(flussServer.getClientConfig)
+    flussConfig.set(TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL, 
POLL_INTERVAL)
+
+    val jobClient = LakeTieringJobBuilder
+      .newBuilder(
+        execEnv,
+        flussConfig,
+        lakeCatalogConf,
+        new Configuration(),
+        dataLakeFormat.toString)
+      .build()
+
+    try {
+      val tableBucket = new TableBucket(tableId, 0)
+      val deadline = System.currentTimeMillis() + SYNC_TIMEOUT.toMillis
+      var synced = false
+      while (!synced && System.currentTimeMillis() < deadline) {
+        try {
+          val replica = flussServer.waitAndGetLeaderReplica(tableBucket)
+          synced = replica.getLogTablet.getLakeTableSnapshotId >= 0
+        } catch {
+          case _: Exception =>
+        }
+        if (!synced) Thread.sleep(SYNC_POLL_INTERVAL_MS)
+      }
+      assert(synced, s"Bucket $tableBucket not synced to lake within 
$SYNC_TIMEOUT")
+    } finally {
+      jobClient.cancel().get()
+    }
+  }
+
+  override protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+    try {
+      f
+    } finally {
+      tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS 
$DEFAULT_DATABASE.$t"))
+    }
+  }
+
+  test("Spark Lake Read: log table falls back when no lake snapshot") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (id INT, name STRING)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+             |  '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+             |  '${BUCKET_NUMBER.key()}' = 1)
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(1, "hello"), (2, "world"), (3, "fluss")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY id"),
+        Row(1, "hello") :: Row(2, "world") :: Row(3, "fluss") :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT name FROM $DEFAULT_DATABASE.t ORDER BY name"),
+        Row("fluss") :: Row("hello") :: Row("world") :: Nil
+      )
+    }
+  }
+
+  test("Spark Lake Read: log table lake-only (all data in lake, no log tail)") 
{
+    withTable("t_lake_only") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t_lake_only (id INT, name STRING)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+             |  '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+             |  '${BUCKET_NUMBER.key()}' = 1)
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t_lake_only VALUES
+             |(1, "alpha"), (2, "beta"), (3, "gamma")
+             |""".stripMargin)
+
+      tierToLake("t_lake_only")
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t_lake_only ORDER BY id"),
+        Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT name FROM $DEFAULT_DATABASE.t_lake_only ORDER BY name"),
+        Row("alpha") :: Row("beta") :: Row("gamma") :: Nil
+      )
+    }
+  }
+
+  test("Spark Lake Read: log table lake-only projection on timestamp column") {
+    withTable("t_lake_timestamp") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t_lake_timestamp (
+             |  id INT,
+             |  ts TIMESTAMP,
+             |  name STRING)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+             |  '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+             |  '${BUCKET_NUMBER.key()}' = 1)
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t_lake_timestamp VALUES
+             |(1, TIMESTAMP "2026-01-01 12:00:00", "alpha"),
+             |(2, TIMESTAMP "2026-01-02 12:00:00", "beta"),
+             |(3, TIMESTAMP "2026-01-03 12:00:00", "gamma")
+             |""".stripMargin)
+
+      tierToLake("t_lake_timestamp")
+
+      checkAnswer(
+        sql(s"SELECT ts FROM $DEFAULT_DATABASE.t_lake_timestamp ORDER BY ts"),
+        Row(java.sql.Timestamp.valueOf("2026-01-01 12:00:00")) ::
+          Row(java.sql.Timestamp.valueOf("2026-01-02 12:00:00")) ::
+          Row(java.sql.Timestamp.valueOf("2026-01-03 12:00:00")) :: Nil
+      )
+    }
+  }
+
+  test("Spark Lake Read: log table union read (lake + log tail)") {
+    withTable("t_union") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t_union (id INT, name STRING)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+             |  '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+             |  '${BUCKET_NUMBER.key()}' = 1)
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t_union VALUES
+             |(1, "alpha"), (2, "beta"), (3, "gamma")
+             |""".stripMargin)
+
+      tierToLake("t_union")
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t_union VALUES
+             |(4, "delta"), (5, "epsilon")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t_union ORDER BY id"),
+        Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") ::
+          Row(4, "delta") :: Row(5, "epsilon") :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT name FROM $DEFAULT_DATABASE.t_union ORDER BY name"),
+        Row("alpha") :: Row("beta") :: Row("delta") ::
+          Row("epsilon") :: Row("gamma") :: Nil
+      )
+    }
+  }
+
+  test("Spark Lake Read: non-FULL startup mode skips lake path") {
+    withTable("t_earliest") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t_earliest (id INT, name STRING)
+             | TBLPROPERTIES (
+             |  '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+             |  '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+             |  '${BUCKET_NUMBER.key()}' = 1)
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t_earliest VALUES
+             |(1, "alpha"), (2, "beta"), (3, "gamma")
+             |""".stripMargin)
+
+      tierToLake("t_earliest")
+
+      try {
+        spark.conf.set("spark.sql.fluss.scan.startup.mode", "earliest")
+
+        checkAnswer(
+          sql(s"SELECT * FROM $DEFAULT_DATABASE.t_earliest ORDER BY id"),
+          Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") :: Nil
+        )
+      } finally {
+        spark.conf.set("spark.sql.fluss.scan.startup.mode", "full")
+      }
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala
new file mode 100644
index 000000000..2e2b941f4
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.DataLakeFormat
+
+import java.nio.file.Files
+
+class SparkLakePaimonLogTableReadTest extends SparkLakeLogTableReadTestBase {
+
+  override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.PAIMON
+
+  override protected def flussConf: Configuration = {
+    val conf = super.flussConf
+    conf.setString("datalake.format", DataLakeFormat.PAIMON.toString)
+    conf.setString("datalake.paimon.metastore", "filesystem")
+    conf.setString("datalake.paimon.cache-enabled", "false")
+    warehousePath =
+      
Files.createTempDirectory("fluss-testing-lake-read").resolve("warehouse").toString
+    conf.setString("datalake.paimon.warehouse", warehousePath)
+    conf
+  }
+
+  override protected def lakeCatalogConf: Configuration = {
+    val conf = new Configuration()
+    conf.setString("metastore", "filesystem")
+    conf.setString("warehouse", warehousePath)
+    conf
+  }
+}
diff --git a/fluss-spark/pom.xml b/fluss-spark/pom.xml
index 53795dcb5..b64b12ed6 100644
--- a/fluss-spark/pom.xml
+++ b/fluss-spark/pom.xml
@@ -31,6 +31,10 @@
     <name>Fluss : Engine Spark :</name>
     <packaging>pom</packaging>
 
+    <properties>
+        <flink.version>1.20.3</flink.version>
+    </properties>
+
     <modules>
         <module>fluss-spark-common</module>
         <module>fluss-spark-ut</module>


Reply via email to