This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 400577a14 [spark] Add union read for lake-enabled log tables (#2956)
400577a14 is described below
commit 400577a14d4291864145bffa2581fedc80dcbffb
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Apr 8 12:16:35 2026 +0100
[spark] Add union read for lake-enabled log tables (#2956)
---
.../scala/org/apache/fluss/spark/SparkTable.scala | 15 +-
.../fluss/spark/read/FlussInputPartition.scala | 18 ++
.../apache/fluss/spark/read/FlussLakeBatch.scala | 312 +++++++++++++++++++++
.../spark/read/FlussLakePartitionReader.scala | 83 ++++++
.../read/FlussLakePartitionReaderFactory.scala | 57 ++++
.../fluss/spark/read/FlussLakeSourceUtils.scala | 49 ++++
.../org/apache/fluss/spark/read/FlussScan.scala | 24 ++
.../apache/fluss/spark/read/FlussScanBuilder.scala | 13 +
fluss-spark/fluss-spark-ut/pom.xml | 42 ++-
...apache.fluss.lake.lakestorage.LakeStoragePlugin | 3 +-
...ge.LakeStoragePlugin => log4j2-test.properties} | 11 +-
.../lake/SparkLakeIcebergLogTableReadTest.scala | 45 +++
.../spark/lake/SparkLakeLogTableReadTestBase.scala | 253 +++++++++++++++++
.../lake/SparkLakePaimonLogTableReadTest.scala | 46 +++
fluss-spark/pom.xml | 4 +
15 files changed, 964 insertions(+), 11 deletions(-)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 144db03ae..c2ad05081 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -21,7 +21,7 @@ import org.apache.fluss.client.admin.Admin
import org.apache.fluss.config.{Configuration => FlussConfiguration}
import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.spark.catalog.{AbstractSparkTable,
SupportsFlussPartitionManagement}
-import org.apache.fluss.spark.read.{FlussAppendScanBuilder,
FlussUpsertScanBuilder}
+import org.apache.fluss.spark.read.{FlussAppendScanBuilder,
FlussLakeAppendScanBuilder, FlussUpsertScanBuilder}
import org.apache.fluss.spark.write.{FlussAppendWriteBuilder,
FlussUpsertWriteBuilder}
import org.apache.spark.sql.catalyst.SQLConfHelper
@@ -61,8 +61,19 @@ class SparkTable(
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
populateSparkConf(flussConfig)
+ val isDataLakeEnabled = tableInfo.getTableConfig.isDataLakeEnabled
+ val startupMode = options
+ .getOrDefault(
+ SparkFlussConf.SCAN_START_UP_MODE.key(),
+ flussConfig.get(SparkFlussConf.SCAN_START_UP_MODE))
+ .toUpperCase
+ val isFullMode = startupMode == SparkFlussConf.StartUpMode.FULL.toString
if (tableInfo.getPrimaryKeys.isEmpty) {
- new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
+ if (isDataLakeEnabled && isFullMode) {
+ new FlussLakeAppendScanBuilder(tablePath, tableInfo, options,
flussConfig)
+ } else {
+ new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
+ }
} else {
new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
index 397b62430..ccf89cc87 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
@@ -45,6 +45,24 @@ case class FlussAppendInputPartition(tableBucket:
TableBucket, startOffset: Long
}
}
+/**
+ * Represents an input partition for reading data from a single lake split.
Each lake split maps to
+ * one Spark task, enabling parallel lake reads across splits.
+ *
+ * @param tableBucket
+ * the table bucket this split belongs to
+ * @param lakeSplitBytes
+ * serialized lake split data
+ */
+case class FlussLakeInputPartition(tableBucket: TableBucket, lakeSplitBytes:
Array[Byte])
+ extends FlussInputPartition {
+ override def toString: String = {
+ s"FlussLakeInputPartition{tableId=${tableBucket.getTableId},
bucketId=${tableBucket.getBucket}," +
+ s" partitionId=${tableBucket.getPartitionId}," +
+ s" splitSize=${lakeSplitBytes.length}}"
+ }
+}
+
/**
* Represents an input partition for reading data from a primary key table
bucket. This partition
* includes snapshot information for hybrid snapshot-log reading.
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala
new file mode 100644
index 000000000..70d5ef45c
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl,
OffsetsInitializer}
+import org.apache.fluss.client.table.scanner.log.LogScanner
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket,
TableInfo, TablePath}
+import org.apache.fluss.utils.ExceptionUtils
+
+import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Batch for reading lake-enabled log table (append-only table with
datalake). */
+class FlussLakeAppendBatch(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ readSchema: StructType,
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration)
+ extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+ // Required by FlussBatch but unused — lake snapshot determines start
offsets.
+ override val startOffsetsInitializer: OffsetsInitializer =
OffsetsInitializer.earliest()
+
+ override val stoppingOffsetsInitializer: OffsetsInitializer = {
+ FlussOffsetInitializers.stoppingOffsetsInitializer(true, options,
flussConfig)
+ }
+
+ private lazy val (partitions, isFallback) = doPlan()
+
+ override def planInputPartitions(): Array[InputPartition] = partitions
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ if (isFallback) {
+ new FlussAppendPartitionReaderFactory(tablePath, projection, options,
flussConfig)
+ } else {
+ new FlussLakeAppendPartitionReaderFactory(
+ tableInfo.getProperties.toMap,
+ tablePath,
+ tableInfo.getRowType,
+ projection,
+ flussConfig)
+ }
+ }
+
+ /**
+ * Plans input partitions for reading. The returned isFallback flag is true
when no lake snapshot
+ * exists and the plan falls back to pure log reading.
+ */
+ private def doPlan(): (Array[InputPartition], Boolean) = {
+ val lakeSnapshot =
+ try {
+ admin.getReadableLakeSnapshot(tablePath).get()
+ } catch {
+ case e: Exception =>
+ if (
+ ExceptionUtils
+ .stripExecutionException(e)
+ .isInstanceOf[LakeTableSnapshotNotExistException]
+ ) {
+ return (planFallbackPartitions(), true)
+ }
+ throw e
+ }
+
+ val lakeSource =
FlussLakeSourceUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath)
+ lakeSource.withProject(FlussLakeSourceUtils.lakeProjection(projection))
+
+ val lakeSplits = lakeSource
+ .createPlanner(new LakeSource.PlannerContext {
+ override def snapshotId(): Long = lakeSnapshot.getSnapshotId
+ })
+ .plan()
+
+ val splitSerializer = lakeSource.getSplitSerializer
+ val tableBucketsOffset = lakeSnapshot.getTableBucketsOffset
+ val buckets = (0 until tableInfo.getNumBuckets).toSeq
+ val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin,
tablePath)
+
+ val partitions = if (tableInfo.isPartitioned) {
+ planPartitionedTable(
+ lakeSplits.asScala,
+ splitSerializer,
+ tableBucketsOffset,
+ buckets,
+ bucketOffsetsRetriever)
+ } else {
+ planNonPartitionedTable(
+ lakeSplits.asScala,
+ splitSerializer,
+ tableBucketsOffset,
+ buckets,
+ bucketOffsetsRetriever)
+ }
+
+ (partitions, false)
+ }
+
+ private def planNonPartitionedTable(
+ lakeSplits: Seq[LakeSplit],
+ splitSerializer: SimpleVersionedSerializer[LakeSplit],
+ tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long],
+ buckets: Seq[Int],
+ bucketOffsetsRetriever: BucketOffsetsRetrieverImpl):
Array[InputPartition] = {
+ val tableId = tableInfo.getTableId
+
+ val lakePartitions =
+ createLakePartitions(lakeSplits, splitSerializer, tableId, partitionId =
None)
+
+ val stoppingOffsets =
+ getBucketOffsets(stoppingOffsetsInitializer, null, buckets,
bucketOffsetsRetriever)
+ val logPartitions = buckets.flatMap {
+ bucketId =>
+ val tableBucket = new TableBucket(tableId, bucketId)
+ createLogTailPartition(tableBucket, tableBucketsOffset,
stoppingOffsets(bucketId))
+ }
+
+ (lakePartitions ++ logPartitions).toArray
+ }
+
+ private def planPartitionedTable(
+ lakeSplits: Seq[LakeSplit],
+ splitSerializer: SimpleVersionedSerializer[LakeSplit],
+ tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long],
+ buckets: Seq[Int],
+ bucketOffsetsRetriever: BucketOffsetsRetrieverImpl):
Array[InputPartition] = {
+ val tableId = tableInfo.getTableId
+
+ val flussPartitionIdByName = mutable.LinkedHashMap.empty[String, Long]
+ partitionInfos.asScala.foreach {
+ pi => flussPartitionIdByName(pi.getPartitionName) = pi.getPartitionId
+ }
+
+ val lakeSplitsByPartition = groupLakeSplitsByPartition(lakeSplits)
+ var lakeSplitPartitionId = -1L
+
+ val lakeAndLogPartitions = lakeSplitsByPartition.flatMap {
+ case (partitionName, splits) =>
+ flussPartitionIdByName.remove(partitionName) match {
+ case Some(partitionId) =>
+ // Partition in both lake and Fluss — lake splits + log tail
+ val lakePartitions =
+ createLakePartitions(splits, splitSerializer, tableId,
Some(partitionId))
+
+ val stoppingOffsets = getBucketOffsets(
+ stoppingOffsetsInitializer,
+ partitionName,
+ buckets,
+ bucketOffsetsRetriever)
+ val logPartitions = buckets.flatMap {
+ bucketId =>
+ val tableBucket = new TableBucket(tableId, partitionId,
bucketId)
+ createLogTailPartition(tableBucket, tableBucketsOffset,
stoppingOffsets(bucketId))
+ }
+
+ lakePartitions ++ logPartitions
+
+ case None =>
+ // Partition only in lake (expired in Fluss) — lake splits only
+ val pid = lakeSplitPartitionId
+ lakeSplitPartitionId -= 1
+ createLakePartitions(splits, splitSerializer, tableId, Some(pid))
+ }
+ }.toSeq
+
+ // Partitions only in Fluss (not yet tiered) — log from earliest
+ val flussOnlyPartitions = flussPartitionIdByName.flatMap {
+ case (partitionName, partitionId) =>
+ val stoppingOffsets = getBucketOffsets(
+ stoppingOffsetsInitializer,
+ partitionName,
+ buckets,
+ bucketOffsetsRetriever)
+ buckets.flatMap {
+ bucketId =>
+ val stoppingOffset = stoppingOffsets(bucketId)
+ if (stoppingOffset > 0) {
+ val tableBucket = new TableBucket(tableId, partitionId, bucketId)
+ Some(
+ FlussAppendInputPartition(
+ tableBucket,
+ LogScanner.EARLIEST_OFFSET,
+ stoppingOffset): InputPartition)
+ } else {
+ None
+ }
+ }
+ }.toSeq
+
+ (lakeAndLogPartitions ++ flussOnlyPartitions).toArray
+ }
+
+ private def groupLakeSplitsByPartition(
+ lakeSplits: Seq[LakeSplit]): mutable.LinkedHashMap[String,
mutable.ArrayBuffer[LakeSplit]] = {
+ val grouped = mutable.LinkedHashMap.empty[String,
mutable.ArrayBuffer[LakeSplit]]
+ lakeSplits.foreach {
+ split =>
+ val partitionName = if (split.partition() == null ||
split.partition().isEmpty) {
+ ""
+ } else {
+
split.partition().asScala.mkString(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR)
+ }
+ grouped.getOrElseUpdate(partitionName, mutable.ArrayBuffer.empty) +=
split
+ }
+ grouped
+ }
+
+ private def createLakePartitions(
+ splits: Seq[LakeSplit],
+ splitSerializer: SimpleVersionedSerializer[LakeSplit],
+ tableId: Long,
+ partitionId: Option[Long]): Seq[InputPartition] = {
+ splits.map {
+ split =>
+ val tableBucket = partitionId match {
+ case Some(pid) => new TableBucket(tableId, pid, split.bucket())
+ case None => new TableBucket(tableId, split.bucket())
+ }
+ FlussLakeInputPartition(tableBucket, splitSerializer.serialize(split))
+ }
+ }
+
+ private def createLogTailPartition(
+ tableBucket: TableBucket,
+ tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long],
+ stoppingOffset: Long): Option[InputPartition] = {
+ val snapshotLogOffset = tableBucketsOffset.get(tableBucket)
+ if (snapshotLogOffset != null) {
+ if (snapshotLogOffset.longValue() < stoppingOffset) {
+ Some(FlussAppendInputPartition(tableBucket,
snapshotLogOffset.longValue(), stoppingOffset))
+ } else {
+ None
+ }
+ } else if (stoppingOffset > 0) {
+ Some(FlussAppendInputPartition(tableBucket, LogScanner.EARLIEST_OFFSET,
stoppingOffset))
+ } else {
+ None
+ }
+ }
+
+ private def getBucketOffsets(
+ initializer: OffsetsInitializer,
+ partitionName: String,
+ buckets: Seq[Int],
+ bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Map[Int, Long] = {
+ initializer
+ .getBucketOffsets(partitionName, buckets.map(Integer.valueOf).asJava,
bucketOffsetsRetriever)
+ .asScala
+ .map(e => (e._1.intValue(), Long2long(e._2)))
+ .toMap
+ }
+
+ private def planFallbackPartitions(): Array[InputPartition] = {
+ val fallbackStartInit =
FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
+ val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin,
tablePath)
+ val buckets = (0 until tableInfo.getNumBuckets).toSeq
+ val tableId = tableInfo.getTableId
+
+ def createPartitions(
+ partitionId: Option[Long],
+ partitionName: String): Array[InputPartition] = {
+ val startOffsets =
+ getBucketOffsets(fallbackStartInit, partitionName, buckets,
bucketOffsetsRetriever)
+ val stoppingOffsets =
+ getBucketOffsets(stoppingOffsetsInitializer, partitionName, buckets,
bucketOffsetsRetriever)
+
+ buckets.map {
+ bucketId =>
+ val tableBucket = partitionId match {
+ case Some(pid) => new TableBucket(tableId, pid, bucketId)
+ case None => new TableBucket(tableId, bucketId)
+ }
+ FlussAppendInputPartition(
+ tableBucket,
+ startOffsets(bucketId),
+ stoppingOffsets(bucketId)
+ ): InputPartition
+ }.toArray
+ }
+
+ if (tableInfo.isPartitioned) {
+ partitionInfos.asScala.flatMap {
+ pi => createPartitions(Some(pi.getPartitionId), pi.getPartitionName)
+ }.toArray
+ } else {
+ createPartitions(None, null)
+ }
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala
new file mode 100644
index 000000000..9c0031409
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.record.LogRecord
+import org.apache.fluss.spark.row.DataConverter
+import org.apache.fluss.types.RowType
+import org.apache.fluss.utils.CloseableIterator
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.PartitionReader
+
+/** Partition reader that reads data from a single lake split via lake storage
(no Fluss connection). */
+class FlussLakePartitionReader(
+ tablePath: TablePath,
+ rowType: RowType,
+ partition: FlussLakeInputPartition,
+ lakeSource: LakeSource[LakeSplit])
+ extends PartitionReader[InternalRow]
+ with Logging {
+
+ private var currentRow: InternalRow = _
+ private var closed = false
+ private var recordIterator: CloseableIterator[LogRecord] = _
+
+ initialize()
+
+ private def initialize(): Unit = {
+ logInfo(s"Reading lake split for table $tablePath
bucket=${partition.tableBucket.getBucket}")
+
+ val splitSerializer = lakeSource.getSplitSerializer
+ val split = splitSerializer.deserialize(splitSerializer.getVersion,
partition.lakeSplitBytes)
+
+ recordIterator = lakeSource
+ .createRecordReader(new LakeSource.ReaderContext[LakeSplit] {
+ override def lakeSplit(): LakeSplit = split
+ })
+ .read()
+ }
+
+ override def next(): Boolean = {
+ if (closed || recordIterator == null) {
+ return false
+ }
+
+ if (recordIterator.hasNext) {
+ val logRecord = recordIterator.next()
+ currentRow = DataConverter.toSparkInternalRow(logRecord.getRow, rowType)
+ true
+ } else {
+ false
+ }
+ }
+
+ override def get(): InternalRow = currentRow
+
+ override def close(): Unit = {
+ if (!closed) {
+ closed = true
+ if (recordIterator != null) {
+ recordIterator.close()
+ }
+ }
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala
new file mode 100644
index 000000000..18c592801
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.types.RowType
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
+
+import java.util
+
+/** Factory for lake-enabled log table reads. Dispatches to lake or log reader
per partition type. */
+class FlussLakeAppendPartitionReaderFactory(
+ tableProperties: util.Map[String, String],
+ tablePath: TablePath,
+ rowType: RowType,
+ projection: Array[Int],
+ flussConfig: Configuration)
+ extends PartitionReaderFactory {
+
+ @transient private lazy val lakeSource: LakeSource[LakeSplit] = {
+ val source = FlussLakeSourceUtils.createLakeSource(tableProperties,
tablePath)
+ source.withProject(FlussLakeSourceUtils.lakeProjection(projection))
+ source
+ }
+
+ private lazy val projectedRowType: RowType = rowType.project(projection)
+
+ override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
+ partition match {
+ case lake: FlussLakeInputPartition =>
+ new FlussLakePartitionReader(tablePath, projectedRowType, lake,
lakeSource)
+ case log: FlussAppendInputPartition =>
+ new FlussAppendPartitionReader(tablePath, projection, log, flussConfig)
+ case _ =>
+ throw new IllegalArgumentException(s"Unexpected partition type:
${partition.getClass}")
+ }
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala
new file mode 100644
index 000000000..41958c344
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp
+import org.apache.fluss.lake.source.{LakeSource, LakeSplit}
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.utils.PropertiesUtils
+
+import java.util
+
+/** Shared utilities for creating lake sources and projections. */
+object FlussLakeSourceUtils {
+
+ def createLakeSource(
+ tableProperties: util.Map[String, String],
+ tablePath: TablePath): LakeSource[LakeSplit] = {
+ val tableConfig = Configuration.fromMap(tableProperties)
+ val datalakeFormat = tableConfig.get(ConfigOptions.TABLE_DATALAKE_FORMAT)
+ val dataLakePrefix = "table.datalake." + datalakeFormat + "."
+
+ val catalogProperties =
PropertiesUtils.extractAndRemovePrefix(tableProperties, dataLakePrefix)
+ val lakeConfig = Configuration.fromMap(catalogProperties)
+ val lakeStoragePlugin =
+ LakeStoragePluginSetUp.fromDataLakeFormat(datalakeFormat.toString, null)
+ val lakeStorage = lakeStoragePlugin.createLakeStorage(lakeConfig)
+ lakeStorage.createLakeSource(tablePath).asInstanceOf[LakeSource[LakeSplit]]
+ }
+
+ def lakeProjection(projection: Array[Int]): Array[Array[Int]] = {
+ projection.map(i => Array(i))
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
index a54396127..d4e14bd47 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
@@ -61,6 +61,30 @@ case class FlussAppendScan(
}
}
+/** Fluss Lake Append Scan. */
+case class FlussLakeAppendScan(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ requiredSchema: Option[StructType],
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration)
+ extends FlussScan {
+
+ override def toBatch: Batch = {
+ new FlussLakeAppendBatch(tablePath, tableInfo, readSchema, options,
flussConfig)
+ }
+
+ override def toMicroBatchStream(checkpointLocation: String):
MicroBatchStream = {
+ new FlussAppendMicroBatchStream(
+ tablePath,
+ tableInfo,
+ readSchema,
+ options,
+ flussConfig,
+ checkpointLocation)
+ }
+}
+
/** Fluss Upsert Scan. */
case class FlussUpsertScan(
tablePath: TablePath,
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
index cd3e6768f..9dd49f4df 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
@@ -47,6 +47,19 @@ class FlussAppendScanBuilder(
}
}
+/** Fluss Lake Append Scan Builder. */
+class FlussLakeAppendScanBuilder(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ options: CaseInsensitiveStringMap,
+ flussConfig: FlussConfiguration)
+ extends FlussScanBuilder {
+
+ override def build(): Scan = {
+ FlussLakeAppendScan(tablePath, tableInfo, requiredSchema, options,
flussConfig)
+ }
+}
+
/** Fluss Upsert Scan Builder. */
class FlussUpsertScanBuilder(
tablePath: TablePath,
diff --git a/fluss-spark/fluss-spark-ut/pom.xml
b/fluss-spark/fluss-spark-ut/pom.xml
index e77338cac..40339a2c4 100644
--- a/fluss-spark/fluss-spark-ut/pom.xml
+++ b/fluss-spark/fluss-spark-ut/pom.xml
@@ -95,6 +95,42 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-lake-iceberg</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Flink tiering job dependencies -->
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-flink-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-flink-1.20</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- for curator TestingServer -->
<dependency>
<groupId>org.apache.curator</groupId>
@@ -127,12 +163,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>3.8.1</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
diff --git
a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
index 69bf0f8a4..1c752018e 100644
---
a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
+++
b/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
@@ -16,4 +16,5 @@
# limitations under the License.
#
-org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin
\ No newline at end of file
+org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin
+org.apache.fluss.lake.iceberg.IcebergLakeStoragePlugin
diff --git
a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
b/fluss-spark/fluss-spark-ut/src/test/resources/log4j2-test.properties
similarity index 67%
copy from
fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
copy to fluss-spark/fluss-spark-ut/src/test/resources/log4j2-test.properties
index 69bf0f8a4..edfee0abf 100644
---
a/fluss-spark/fluss-spark-ut/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
+++ b/fluss-spark/fluss-spark-ut/src/test/resources/log4j2-test.properties
@@ -15,5 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-org.apache.fluss.lake.paimon.PaimonLakeStoragePlugin
\ No newline at end of file
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref=TestLogger
+appender.testlogger.name=TestLogger
+appender.testlogger.type=CONSOLE
+appender.testlogger.target=SYSTEM_ERR
+appender.testlogger.layout.type=PatternLayout
+appender.testlogger.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala
new file mode 100644
index 000000000..fad9bcd90
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.DataLakeFormat
+
+import java.nio.file.Files
+
+class SparkLakeIcebergLogTableReadTest extends SparkLakeLogTableReadTestBase {
+
+ override protected def dataLakeFormat: DataLakeFormat =
DataLakeFormat.ICEBERG
+
+ override protected def flussConf: Configuration = {
+ val conf = super.flussConf
+ conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString)
+ conf.setString("datalake.iceberg.type", "hadoop")
+ warehousePath =
+
Files.createTempDirectory("fluss-testing-iceberg-lake-read").resolve("warehouse").toString
+ conf.setString("datalake.iceberg.warehouse", warehousePath)
+ conf
+ }
+
+ override protected def lakeCatalogConf: Configuration = {
+ val conf = new Configuration()
+ conf.setString("type", "hadoop")
+ conf.setString("warehouse", warehousePath)
+ conf
+ }
+}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala
new file mode 100644
index 000000000..a23839461
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake
+
+import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.flink.tiering.LakeTieringJobBuilder
+import org.apache.fluss.flink.tiering.source.TieringSourceOptions
+import org.apache.fluss.metadata.{DataLakeFormat, TableBucket}
+import org.apache.fluss.spark.FlussSparkTestBase
+import org.apache.fluss.spark.SparkConnectorOptions.BUCKET_NUMBER
+
+import org.apache.flink.api.common.RuntimeExecutionMode
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.spark.sql.Row
+
+import java.time.Duration
+
+/**
+ * Base class for lake-enabled log table read tests. Subclasses provide the
lake format config and
+ * lake catalog configuration.
+ */
+abstract class SparkLakeLogTableReadTestBase extends FlussSparkTestBase {
+
+ protected var warehousePath: String = _
+
+ /** The lake format used by this test. */
+ protected def dataLakeFormat: DataLakeFormat
+
+ /** Lake catalog configuration specific to the format. */
+ protected def lakeCatalogConf: Configuration
+
+ private val TIERING_PARALLELISM = 2
+ private val CHECKPOINT_INTERVAL_MS = 1000L
+ private val POLL_INTERVAL: Duration = Duration.ofMillis(500L)
+ private val SYNC_TIMEOUT: Duration = Duration.ofMinutes(2)
+ private val SYNC_POLL_INTERVAL_MS = 500L
+
+ /** Tier all pending data for the given table to the lake. */
+ protected def tierToLake(tableName: String): Unit = {
+ val tableId =
loadFlussTable(createTablePath(tableName)).getTableInfo.getTableId
+
+ val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+ execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
+ execEnv.setParallelism(TIERING_PARALLELISM)
+ execEnv.enableCheckpointing(CHECKPOINT_INTERVAL_MS)
+
+ val flussConfig = new Configuration(flussServer.getClientConfig)
+ flussConfig.set(TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL,
POLL_INTERVAL)
+
+ val jobClient = LakeTieringJobBuilder
+ .newBuilder(
+ execEnv,
+ flussConfig,
+ lakeCatalogConf,
+ new Configuration(),
+ dataLakeFormat.toString)
+ .build()
+
+ try {
+ val tableBucket = new TableBucket(tableId, 0)
+ val deadline = System.currentTimeMillis() + SYNC_TIMEOUT.toMillis
+ var synced = false
+ while (!synced && System.currentTimeMillis() < deadline) {
+ try {
+ val replica = flussServer.waitAndGetLeaderReplica(tableBucket)
+ synced = replica.getLogTablet.getLakeTableSnapshotId >= 0
+ } catch {
+ case _: Exception =>
+ }
+ if (!synced) Thread.sleep(SYNC_POLL_INTERVAL_MS)
+ }
+ assert(synced, s"Bucket $tableBucket not synced to lake within
$SYNC_TIMEOUT")
+ } finally {
+ jobClient.cancel().get()
+ }
+ }
+
+ override protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+ try {
+ f
+ } finally {
+ tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS
$DEFAULT_DATABASE.$t"))
+ }
+ }
+
+ test("Spark Lake Read: log table falls back when no lake snapshot") {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (id INT, name STRING)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(1, "hello"), (2, "world"), (3, "fluss")
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY id"),
+ Row(1, "hello") :: Row(2, "world") :: Row(3, "fluss") :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT name FROM $DEFAULT_DATABASE.t ORDER BY name"),
+ Row("fluss") :: Row("hello") :: Row("world") :: Nil
+ )
+ }
+ }
+
+ test("Spark Lake Read: log table lake-only (all data in lake, no log tail)")
{
+ withTable("t_lake_only") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t_lake_only (id INT, name STRING)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_lake_only VALUES
+ |(1, "alpha"), (2, "beta"), (3, "gamma")
+ |""".stripMargin)
+
+ tierToLake("t_lake_only")
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t_lake_only ORDER BY id"),
+ Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT name FROM $DEFAULT_DATABASE.t_lake_only ORDER BY name"),
+ Row("alpha") :: Row("beta") :: Row("gamma") :: Nil
+ )
+ }
+ }
+
+ test("Spark Lake Read: log table lake-only projection on timestamp column") {
+ withTable("t_lake_timestamp") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t_lake_timestamp (
+ | id INT,
+ | ts TIMESTAMP,
+ | name STRING)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_lake_timestamp VALUES
+ |(1, TIMESTAMP "2026-01-01 12:00:00", "alpha"),
+ |(2, TIMESTAMP "2026-01-02 12:00:00", "beta"),
+ |(3, TIMESTAMP "2026-01-03 12:00:00", "gamma")
+ |""".stripMargin)
+
+ tierToLake("t_lake_timestamp")
+
+ checkAnswer(
+ sql(s"SELECT ts FROM $DEFAULT_DATABASE.t_lake_timestamp ORDER BY ts"),
+ Row(java.sql.Timestamp.valueOf("2026-01-01 12:00:00")) ::
+ Row(java.sql.Timestamp.valueOf("2026-01-02 12:00:00")) ::
+ Row(java.sql.Timestamp.valueOf("2026-01-03 12:00:00")) :: Nil
+ )
+ }
+ }
+
+ test("Spark Lake Read: log table union read (lake + log tail)") {
+ withTable("t_union") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t_union (id INT, name STRING)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_union VALUES
+ |(1, "alpha"), (2, "beta"), (3, "gamma")
+ |""".stripMargin)
+
+ tierToLake("t_union")
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_union VALUES
+ |(4, "delta"), (5, "epsilon")
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t_union ORDER BY id"),
+ Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") ::
+ Row(4, "delta") :: Row(5, "epsilon") :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT name FROM $DEFAULT_DATABASE.t_union ORDER BY name"),
+ Row("alpha") :: Row("beta") :: Row("delta") ::
+ Row("epsilon") :: Row("gamma") :: Nil
+ )
+ }
+ }
+
+ test("Spark Lake Read: non-FULL startup mode skips lake path") {
+ withTable("t_earliest") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t_earliest (id INT, name STRING)
+ | TBLPROPERTIES (
+ | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true,
+ | '${ConfigOptions.TABLE_DATALAKE_FRESHNESS.key()}' = '1s',
+ | '${BUCKET_NUMBER.key()}' = 1)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t_earliest VALUES
+ |(1, "alpha"), (2, "beta"), (3, "gamma")
+ |""".stripMargin)
+
+ tierToLake("t_earliest")
+
+ try {
+ spark.conf.set("spark.sql.fluss.scan.startup.mode", "earliest")
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t_earliest ORDER BY id"),
+ Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") :: Nil
+ )
+ } finally {
+ spark.conf.set("spark.sql.fluss.scan.startup.mode", "full")
+ }
+ }
+ }
+}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala
new file mode 100644
index 000000000..2e2b941f4
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.lake
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.DataLakeFormat
+
+import java.nio.file.Files
+
+class SparkLakePaimonLogTableReadTest extends SparkLakeLogTableReadTestBase {
+
+ override protected def dataLakeFormat: DataLakeFormat = DataLakeFormat.PAIMON
+
+ override protected def flussConf: Configuration = {
+ val conf = super.flussConf
+ conf.setString("datalake.format", DataLakeFormat.PAIMON.toString)
+ conf.setString("datalake.paimon.metastore", "filesystem")
+ conf.setString("datalake.paimon.cache-enabled", "false")
+ warehousePath =
+
Files.createTempDirectory("fluss-testing-lake-read").resolve("warehouse").toString
+ conf.setString("datalake.paimon.warehouse", warehousePath)
+ conf
+ }
+
+ override protected def lakeCatalogConf: Configuration = {
+ val conf = new Configuration()
+ conf.setString("metastore", "filesystem")
+ conf.setString("warehouse", warehousePath)
+ conf
+ }
+}
diff --git a/fluss-spark/pom.xml b/fluss-spark/pom.xml
index 53795dcb5..b64b12ed6 100644
--- a/fluss-spark/pom.xml
+++ b/fluss-spark/pom.xml
@@ -31,6 +31,10 @@
<name>Fluss : Engine Spark :</name>
<packaging>pom</packaging>
+ <properties>
+ <flink.version>1.20.3</flink.version>
+ </properties>
+
<modules>
<module>fluss-spark-common</module>
<module>fluss-spark-ut</module>