This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new bd68bc0222 [GH-2781] Fix EOFException in OSM PBF reader when file is
split across partitions (#2783)
bd68bc0222 is described below
commit bd68bc0222c4586629969a983192d66429a15f50
Author: Jia Yu <[email protected]>
AuthorDate: Thu Mar 26 02:15:00 2026 -0700
[GH-2781] Fix EOFException in OSM PBF reader when file is split across
partitions (#2783)
---
.../sql/datasources/osm/OsmPartitionReader.scala | 37 ++++++++++++++++++----
.../org/apache/sedona/sql/OsmReaderTest.scala | 13 ++++++++
2 files changed, 43 insertions(+), 7 deletions(-)
diff --git
a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala
b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala
index 580a9572e6..37d009d260 100644
---
a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala
+++
b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.sedona.sql.datasources.osmpbf.{HeaderFinder, StartEndStream}
import org.apache.sedona.sql.datasources.osmpbf.iterators.PbfIterator
import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity
-import org.apache.spark.SerializableWritable
+import org.apache.spark.{SerializableWritable, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -46,16 +46,39 @@ case class OsmPartitionReader(
val path = new Path(new URI(file.filePath.toString()))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
- val f = fs.open(status.getPath)
val offset = findOffset(fs, status, file.start)
- f.seek(file.start + offset)
+ if (offset < 0) {
+ Iterator.empty
+ } else {
+ val f = fs.open(status.getPath)
+ f.seek(file.start + offset)
- new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
- record => {
- resolveEntity(record, requiredSchema)
- })
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ =>
f.close()))
+
+ val iter =
+ new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
+ record => {
+ resolveEntity(record, requiredSchema)
+ })
+
+ new Iterator[InternalRow] {
+ private var closed = false
+ private def closeIfNeeded(): Unit = {
+ if (!closed) {
+ closed = true
+ f.close()
+ }
+ }
+ override def hasNext: Boolean = {
+ val has = iter.hasNext
+ if (!has) closeIfNeeded()
+ has
+ }
+ override def next(): InternalRow = iter.next()
+ }
+ }
}
def findOffset(fs: FileSystem, status: FileStatus, start: Long): Long = {
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala
index 09c083b345..5c82acf51d 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala
@@ -339,6 +339,19 @@ class OsmReaderTest extends TestBaseScala with Matchers {
fieldNames should contain("visible")
}
+ it("should handle file splits where last partition has no block boundary
(GH-2781)") {
+ // Force small splits so the last partition starts inside the final PBF
block,
+ // where no OSMData header exists. Without the fix, this causes
EOFException.
+ withConf(Map("spark.sql.files.maxPartitionBytes" -> "100000")) {
+ val df = sparkSession.read
+ .format("osmpbf")
+ .load(monacoPath)
+
+ assert(df.rdd.getNumPartitions > 1)
+ assert(df.count() > 0)
+ }
+ }
+
it("should not lose precision due to float to double conversion") {
// Test for accuracy loss bug in NodeExtractor and DenseNodeExtractor
val node = sparkSession.read