This is an automated email from the ASF dual-hosted git repository. jiayu pushed a commit to branch feature/osm-pbf-metadata-fields in repository https://gitbox.apache.org/repos/asf/sedona.git
commit dc520de938fb5a1258336fadf28a049ff47bd7c0 Author: Jia Yu <[email protected]> AuthorDate: Sat Mar 21 13:01:36 2026 -0700 [GH-2760] Extend OSM PBF reader to support additional metadata fields Extract changeset, timestamp, uid, user, version, and visible fields from the Info/DenseInfo protobuf messages that were previously ignored by the OSM PBF reader. These fields are part of the standard OSM PBF format and provide useful provenance metadata for each entity. --- .../osmpbf/extractors/DenseNodeExtractor.java | 55 +++++++++++++- .../datasources/osmpbf/features/InfoResolver.java | 45 ++++++++++++ .../datasources/osmpbf/iterators/BlobIterator.java | 8 +-- .../datasources/osmpbf/iterators/NodeIterator.java | 10 ++- .../osmpbf/iterators/RelationIterator.java | 17 ++++- .../datasources/osmpbf/iterators/WayIterator.java | 16 ++++- .../sql/datasources/osmpbf/model/OSMEntity.java | 56 +++++++++++++++ .../sql/datasources/osm/OsmPartitionReader.scala | 10 +++ .../sql/datasources/osm/SchemaProvider.scala | 10 ++- .../org/apache/sedona/sql/OsmReaderTest.scala | 84 ++++++++++++++++++++++ 10 files changed, 296 insertions(+), 15 deletions(-) diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/DenseNodeExtractor.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/DenseNodeExtractor.java index e6ac347762..efd488d366 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/DenseNodeExtractor.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/extractors/DenseNodeExtractor.java @@ -26,23 +26,42 @@ public class DenseNodeExtractor implements Extractor { long latOffset; long lonOffset; long granularity; + int dateGranularity; long firstId; long firstLat; long firstLon; Integer keyIndex; + // DenseInfo delta accumulators + boolean hasDenseInfo; + long firstTimestamp; + long firstChangeset; + int firstUid; + int firstUserSid; + Osmformat.DenseNodes nodes; public DenseNodeExtractor( - Osmformat.DenseNodes nodes, long latOffset, long lonOffset, long granularity) { + Osmformat.DenseNodes nodes, + long latOffset, + long lonOffset, + long granularity, + int dateGranularity) { this.firstId = 0; this.firstLat = 0; this.firstLon = 0; this.latOffset = latOffset; this.lonOffset = lonOffset; this.granularity = granularity; + this.dateGranularity = dateGranularity; this.nodes = nodes; this.keyIndex = 0; + + this.hasDenseInfo = nodes.hasDenseinfo() && nodes.getDenseinfo().getVersionCount() > 0; + this.firstTimestamp = 0; + this.firstChangeset = 0; + this.firstUid = 0; + this.firstUserSid = 0; } public OsmNode extract(int idx, Osmformat.StringTable stringTable) { @@ -63,7 +82,39 @@ public class DenseNodeExtractor implements Extractor { HashMap<String, String> tags = parseTags(stringTable); - return new OsmNode(id, lat, lon, tags); + OsmNode node = new OsmNode(id, lat, lon, tags); + + if (hasDenseInfo) { + Osmformat.DenseInfo denseInfo = nodes.getDenseinfo(); + + // version is NOT delta-encoded + node.setVersion(denseInfo.getVersion(idx)); + + // timestamp, changeset, uid, user_sid are delta-encoded + long timestamp = denseInfo.getTimestamp(idx) + firstTimestamp; + long changeset = denseInfo.getChangeset(idx) + firstChangeset; + int uid = denseInfo.getUid(idx) + firstUid; + int userSid = denseInfo.getUserSid(idx) + firstUserSid; + + firstTimestamp = timestamp; + firstChangeset = changeset; + firstUid = uid; + firstUserSid = userSid; + + node.setTimestamp(timestamp * dateGranularity); + node.setChangeset(changeset); + node.setUid(uid); + if (userSid > 0) { + node.setUser(stringTable.getS(userSid).toStringUtf8()); + } + + // visible is NOT delta-encoded, and may not be present + if (denseInfo.getVisibleCount() > idx) { + node.setVisible(denseInfo.getVisible(idx)); + } + } + + return node; } HashMap<String, String> parseTags(Osmformat.StringTable stringTable) { diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/features/InfoResolver.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/features/InfoResolver.java new file mode 100644 index 0000000000..165336e62a --- /dev/null +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/features/InfoResolver.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.sql.datasources.osmpbf.features; + +import org.apache.sedona.sql.datasources.osmpbf.build.Osmformat; +import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity; + +public class InfoResolver { + + public static void populateInfo( + OSMEntity entity, + Osmformat.Info info, + Osmformat.StringTable stringTable, + int dateGranularity) { + if (info == null) { + return; + } + entity.setVersion(info.getVersion()); + entity.setTimestamp((long) info.getTimestamp() * dateGranularity); + entity.setChangeset(info.getChangeset()); + entity.setUid(info.getUid()); + if (info.getUserSid() > 0) { + entity.setUser(stringTable.getS(info.getUserSid()).toStringUtf8()); + } + if (info.hasVisible()) { + entity.setVisible(info.getVisible()); + } + } +} diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/BlobIterator.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/BlobIterator.java index d3465363e7..3d596a1073 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/BlobIterator.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/BlobIterator.java @@ -72,9 +72,8 @@ public class BlobIterator implements Iterator<OSMEntity> { Iterator<OSMEntity> resolveIterator() { return IteratorUtils.chainedIterator( - new WayIterator(currentPrimitiveGroup.getWaysList(), primitiveBlock.getStringtable()), - new RelationIterator( - currentPrimitiveGroup.getRelationsList(), primitiveBlock.getStringtable()), + new WayIterator(currentPrimitiveGroup.getWaysList(), primitiveBlock), + new RelationIterator(currentPrimitiveGroup.getRelationsList(), primitiveBlock), new NodeIterator(currentPrimitiveGroup.getNodesList(), primitiveBlock), currentPrimitiveGroup.getDense() != null ? new DenseNodeIterator( @@ -84,7 +83,8 @@ public class BlobIterator implements Iterator<OSMEntity> { currentPrimitiveGroup.getDense(), primitiveBlock.getLatOffset(), primitiveBlock.getLonOffset(), - primitiveBlock.getGranularity())) + primitiveBlock.getGranularity(), + primitiveBlock.getDateGranularity())) : Collections.emptyIterator()); } } diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/NodeIterator.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/NodeIterator.java index be966f912e..57a5baa469 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/NodeIterator.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/NodeIterator.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import org.apache.sedona.sql.datasources.osmpbf.build.Osmformat; +import org.apache.sedona.sql.datasources.osmpbf.features.InfoResolver; import org.apache.sedona.sql.datasources.osmpbf.features.TagsResolver; import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity; import org.apache.sedona.sql.datasources.osmpbf.model.OsmNode; @@ -85,6 +86,13 @@ public class NodeIterator implements Iterator<OSMEntity> { HashMap<String, String> tags = TagsResolver.resolveTags(node.getKeysCount(), node::getKeys, node::getVals, stringTable); - return new OsmNode(id, lat, lon, tags); + OsmNode osmNode = new OsmNode(id, lat, lon, tags); + + if (node.hasInfo()) { + InfoResolver.populateInfo( + osmNode, node.getInfo(), stringTable, primitiveBlock.getDateGranularity()); + } + + return osmNode; } } diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/RelationIterator.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/RelationIterator.java index b0b3164351..194bd81912 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/RelationIterator.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/RelationIterator.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import org.apache.sedona.sql.datasources.osmpbf.build.Osmformat; +import org.apache.sedona.sql.datasources.osmpbf.features.InfoResolver; import org.apache.sedona.sql.datasources.osmpbf.features.TagsResolver; import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity; import org.apache.sedona.sql.datasources.osmpbf.model.Relation; @@ -32,12 +33,15 @@ public class RelationIterator implements Iterator<OSMEntity> { long relationCount; List<Osmformat.Relation> relations; Osmformat.StringTable stringTable; + Osmformat.PrimitiveBlock primitiveBlock; - public RelationIterator(List<Osmformat.Relation> relations, Osmformat.StringTable stringTable) { + public RelationIterator( + List<Osmformat.Relation> relations, Osmformat.PrimitiveBlock primitiveBlock) { this.idx = 0; this.relationCount = 0; this.relations = relations; - this.stringTable = stringTable; + this.stringTable = primitiveBlock.getStringtable(); + this.primitiveBlock = primitiveBlock; if (relations != null) { this.relationCount = relations.size(); @@ -79,7 +83,14 @@ public class RelationIterator implements Iterator<OSMEntity> { TagsResolver.resolveTags( relation.getKeysCount(), relation::getKeys, relation::getVals, stringTable); - return new Relation(relation.getId(), tags, refs, refTypes, roles); + Relation relationEntity = new Relation(relation.getId(), tags, refs, refTypes, roles); + + if (relation.hasInfo()) { + InfoResolver.populateInfo( + relationEntity, relation.getInfo(), stringTable, primitiveBlock.getDateGranularity()); + } + + return relationEntity; } private String[] resolveRefRoles(Osmformat.Relation relation) { diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/WayIterator.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/WayIterator.java index 85693ec28a..6f2e4281de 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/WayIterator.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/iterators/WayIterator.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import org.apache.sedona.sql.datasources.osmpbf.build.Osmformat; +import org.apache.sedona.sql.datasources.osmpbf.features.InfoResolver; import org.apache.sedona.sql.datasources.osmpbf.features.TagsResolver; import org.apache.sedona.sql.datasources.osmpbf.model.OSMEntity; import org.apache.sedona.sql.datasources.osmpbf.model.Way; @@ -31,12 +32,14 @@ public class WayIterator implements Iterator<OSMEntity> { long waysCount; List<Osmformat.Way> ways; Osmformat.StringTable stringTable; + Osmformat.PrimitiveBlock primitiveBlock; - public WayIterator(List<Osmformat.Way> ways, Osmformat.StringTable stringTable) { + public WayIterator(List<Osmformat.Way> ways, Osmformat.PrimitiveBlock primitiveBlock) { this.idx = 0; this.waysCount = 0; this.ways = ways; - this.stringTable = stringTable; + this.stringTable = primitiveBlock.getStringtable(); + this.primitiveBlock = primitiveBlock; if (ways != null) { this.waysCount = ways.size(); @@ -79,6 +82,13 @@ public class WayIterator implements Iterator<OSMEntity> { HashMap<String, String> tags = TagsResolver.resolveTags(way.getKeysCount(), way::getKeys, way::getVals, stringTable); - return new Way(way.getId(), tags, refs); + Way wayEntity = new Way(way.getId(), tags, refs); + + if (way.hasInfo()) { + InfoResolver.populateInfo( + wayEntity, way.getInfo(), stringTable, primitiveBlock.getDateGranularity()); + } + + return wayEntity; } } diff --git a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/model/OSMEntity.java b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/model/OSMEntity.java index 56d910391b..8868ba9e93 100644 --- a/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/model/OSMEntity.java +++ b/spark/common/src/main/java/org/apache/sedona/sql/datasources/osmpbf/model/OSMEntity.java @@ -31,6 +31,14 @@ public class OSMEntity { private String[] refRoles; private String[] refTypes; + // Metadata fields from Info/DenseInfo + private Integer version; + private Long timestamp; // milliseconds since epoch + private Long changeset; + private Integer uid; + private String user; + private Boolean visible; + public OSMEntity( long id, double latitude, double longitude, HashMap<String, String> tags, String kind) { this.id = id; @@ -93,4 +101,52 @@ public class OSMEntity { public long getId() { return id; } + + public Integer getVersion() { + return version; + } + + public void setVersion(Integer version) { + this.version = version; + } + + public Long getTimestamp() { + return timestamp; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + + public Long getChangeset() { + return changeset; + } + + public void setChangeset(Long changeset) { + this.changeset = changeset; + } + + public Integer getUid() { + return uid; + } + + public void setUid(Integer uid) { + this.uid = uid; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public Boolean getVisible() { + return visible; + } + + public void setVisible(Boolean visible) { + this.visible = visible; + } } diff --git a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala index a41417816b..580a9572e6 100644 --- a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala +++ b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala @@ -89,6 +89,16 @@ case class OsmPartitionReader( if (entity.getRefTypes != null) ArrayData.toArrayData(entity.getRefTypes.map(x => UTF8String.fromString(x))) else null + case "changeset" => entity.getChangeset + case "timestamp" => + if (entity.getTimestamp != null) + entity.getTimestamp * 1000L // ms to microseconds for Spark TimestampType + else null + case "uid" => entity.getUid + case "user" => + if (entity.getUser != null) UTF8String.fromString(entity.getUser) else null + case "version" => entity.getVersion + case "visible" => entity.getVisible } })) } diff --git a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/SchemaProvider.scala b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/SchemaProvider.scala index afc8831480..a9b01b0689 100644 --- a/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/SchemaProvider.scala +++ b/spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/SchemaProvider.scala @@ -18,7 +18,7 @@ */ package org.apache.sedona.sql.datasources.osm -import org.apache.spark.sql.types.{ArrayType, DoubleType, LongType, MapType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, BooleanType, DoubleType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType} trait SchemaProvider { def schema: StructType = @@ -39,5 +39,11 @@ trait SchemaProvider { nullable = true), StructField("refs", ArrayType(LongType), nullable = true), StructField("ref_roles", ArrayType(StringType), nullable = true), - StructField("ref_types", ArrayType(StringType), nullable = true))) + StructField("ref_types", ArrayType(StringType), nullable = true), + StructField("changeset", LongType, nullable = true), + StructField("timestamp", TimestampType, nullable = true), + StructField("uid", IntegerType, nullable = true), + StructField("user", StringType, nullable = true), + StructField("version", IntegerType, nullable = true), + StructField("visible", BooleanType, nullable = true))) } diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala b/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala index 817a5db0b9..3bbe8650c1 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala @@ -232,6 +232,90 @@ class OsmReaderTest extends TestBaseScala with Matchers { relationsList should contain theSameElementsAs expectedRelationsList } + it("should parse metadata fields (changeset, timestamp, uid, user, version)") { + val osmData = sparkSession.read + .format("osmpbf") + .load(monacoPath) + + // All entities should have version, timestamp, changeset populated + val totalCount = osmData.count() + val withMetadata = osmData + .filter("version is not null and timestamp is not null and changeset is not null") + .count() + + withMetadata shouldEqual totalCount + + // Verify timestamp values are reasonable (after year 2000, before year 2100) + val timestamps = osmData + .selectExpr("min(timestamp)", "max(timestamp)") + .collect() + .head + + val minTimestamp = timestamps.getTimestamp(0) + val maxTimestamp = timestamps.getTimestamp(1) + + minTimestamp.after(java.sql.Timestamp.valueOf("2000-01-01 00:00:00")) shouldBe true + maxTimestamp.before(java.sql.Timestamp.valueOf("2100-01-01 00:00:00")) shouldBe true + + // Verify version is positive + val minVersion = osmData + .selectExpr("min(version)") + .collect() + .head + .getInt(0) + + minVersion should be >= 1 + + // Verify changeset is non-negative + val minChangeset = osmData + .selectExpr("min(changeset)") + .collect() + .head + .getLong(0) + + minChangeset should be >= 0L + + // Verify metadata works for each entity kind + for (kind <- Seq("node", "way", "relation")) { + val kindData = osmData.filter(s"kind == '$kind'") + val kindWithMeta = kindData + .filter("version is not null and timestamp is not null") + .count() + + kindWithMeta shouldEqual kindData.count() + } + } + + it("should include metadata fields in schema for dense nodes") { + val denseData = sparkSession.read + .format("osmpbf") + .load(densePath) + + // Verify schema includes the new fields + val fieldNames = denseData.schema.fieldNames + fieldNames should contain("changeset") + fieldNames should contain("timestamp") + fieldNames should contain("uid") + fieldNames should contain("user") + fieldNames should contain("version") + fieldNames should contain("visible") + } + + it("should include metadata fields in schema for normal nodes") { + val nodesData = sparkSession.read + .format("osmpbf") + .load(nodesPath) + + // Verify schema includes the new fields + val fieldNames = nodesData.schema.fieldNames + fieldNames should contain("changeset") + fieldNames should contain("timestamp") + fieldNames should contain("uid") + fieldNames should contain("user") + fieldNames should contain("version") + fieldNames should contain("visible") + } + it("should not lose precision due to float to double conversion") { // Test for accuracy loss bug in NodeExtractor and DenseNodeExtractor val node = sparkSession.read
