This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new bd68bc0222 [GH-2781] Fix EOFException in OSM PBF reader when file is 
split across partitions (#2783)
bd68bc0222 is described below

commit bd68bc0222c4586629969a983192d66429a15f50
Author: Jia Yu <[email protected]>
AuthorDate: Thu Mar 26 02:15:00 2026 -0700

    [GH-2781] Fix EOFException in OSM PBF reader when file is split across 
partitions (#2783)
---
 .../sql/datasources/osm/OsmPartitionReader.scala   | 37 ++++++++++++++++++----
 .../org/apache/sedona/sql/OsmReaderTest.scala      | 13 ++++++++
 2 files changed, 43 insertions(+), 7 deletions(-)

diff --git 
a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala
 
b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala
index 580a9572e6..37d009d260 100644
--- 
a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala
+++ 
b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.sedona.sql.datasources.osmpbf.{HeaderFinder, StartEndStream}
 import org.apache.sedona.sql.datasources.osmpbf.iterators.PbfIterator
 import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity
-import org.apache.spark.SerializableWritable
+import org.apache.spark.{SerializableWritable, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -46,16 +46,39 @@ case class OsmPartitionReader(
     val path = new Path(new URI(file.filePath.toString()))
     val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
     val status = fs.getFileStatus(path)
-    val f = fs.open(status.getPath)
 
     val offset = findOffset(fs, status, file.start)
 
-    f.seek(file.start + offset)
+    if (offset < 0) {
+      Iterator.empty
+    } else {
+      val f = fs.open(status.getPath)
+      f.seek(file.start + offset)
 
-    new PbfIterator(new StartEndStream(f, (file.length - offset) + 
HEADER_SIZE_LENGTH)).map(
-      record => {
-        resolveEntity(record, requiredSchema)
-      })
+      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
f.close()))
+
+      val iter =
+        new PbfIterator(new StartEndStream(f, (file.length - offset) + 
HEADER_SIZE_LENGTH)).map(
+          record => {
+            resolveEntity(record, requiredSchema)
+          })
+
+      new Iterator[InternalRow] {
+        private var closed = false
+        private def closeIfNeeded(): Unit = {
+          if (!closed) {
+            closed = true
+            f.close()
+          }
+        }
+        override def hasNext: Boolean = {
+          val has = iter.hasNext
+          if (!has) closeIfNeeded()
+          has
+        }
+        override def next(): InternalRow = iter.next()
+      }
+    }
   }
 
   def findOffset(fs: FileSystem, status: FileStatus, start: Long): Long = {
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala 
b/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala
index 09c083b345..5c82acf51d 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala
@@ -339,6 +339,19 @@ class OsmReaderTest extends TestBaseScala with Matchers {
       fieldNames should contain("visible")
     }
 
+    it("should handle file splits where last partition has no block boundary 
(GH-2781)") {
+      // Force small splits so the last partition starts inside the final PBF 
block,
+      // where no OSMData header exists. Without the fix, this causes 
EOFException.
+      withConf(Map("spark.sql.files.maxPartitionBytes" -> "100000")) {
+        val df = sparkSession.read
+          .format("osmpbf")
+          .load(monacoPath)
+
+        assert(df.rdd.getNumPartitions > 1)
+        assert(df.count() > 0)
+      }
+    }
+
     it("should not lose precision due to float to double conversion") {
       // Test for accuracy loss bug in NodeExtractor and DenseNodeExtractor
       val node = sparkSession.read

Reply via email to