This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 030e69db4c [core] Fix blob file read with partition (#6423)
030e69db4c is described below

commit 030e69db4c0ba7385363339cbfd47c8f36a50365
Author: YeJunHao <[email protected]>
AuthorDate: Fri Oct 17 17:25:13 2025 +0800

    [core] Fix blob file read with partition (#6423)
---
 .../java/org/apache/paimon/schema/TableSchema.java |  8 ++++-
 .../org/apache/paimon/spark/sql/BlobTestBase.scala | 38 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java 
b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
index b4baf20cb1..fb2d16c7ed 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -300,7 +300,13 @@ public class TableSchema implements Serializable {
     private List<DataField> projectedDataFields(List<String> 
projectedFieldNames) {
         List<String> fieldNames = fieldNames();
         return projectedFieldNames.stream()
-                .map(k -> fields.get(fieldNames.indexOf(k)))
+                .map(
+                        k -> {
+                            return !fieldNames.contains(k)
+                                    ? null
+                                    : fields.get(fieldNames.indexOf(k));
+                        })
+                .filter(Objects::nonNull)
                 .collect(Collectors.toList());
     }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 9f7b3a8b5c..4204ab79e8 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -89,6 +89,44 @@ class BlobTestBase extends PaimonSparkTestBase {
     }
   }
 
+  test("Blob: test write blob descriptor with partition") {
+    withTable("t") {
+      val blobData = new Array[Byte](1024 * 1024)
+      RANDOM.nextBytes(blobData)
+      val fileIO = new LocalFileIO
+      val uri = "file://" + tempDBDir.toString + "/external_blob"
+      try {
+        val outputStream = fileIO.newOutputStream(new Path(uri), true)
+        try outputStream.write(blobData)
+        finally if (outputStream != null) outputStream.close()
+      }
+
+      val blobDescriptor = new BlobDescriptor(uri, 0, blobData.length)
+      sql(
+        "CREATE TABLE IF NOT EXISTS t (\n" + "id STRING,\n" + "name STRING,\n" 
+ "file_size STRING,\n" + "crc64 STRING,\n" + "modified_time STRING,\n" + 
"content BINARY\n" + ") \n" +
+          "PARTITIONED BY (ds STRING, batch STRING) \n" +
+          "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time' 
= '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' = 
'true','blob-field' = 'content','blob-as-descriptor' = 'true')")
+      sql(
+        "INSERT OVERWRITE TABLE t\nPARTITION(ds= '1017',batch = 'test') VALUES 
\n('1','paimon','1024','12345678','20241017',X'" + bytesToHex(
+          blobDescriptor.serialize()) + "')")
+      val newDescriptorBytes =
+        sql("SELECT content FROM t WHERE id = 
'1'").collect()(0).get(0).asInstanceOf[Array[Byte]]
+      val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes)
+      val options = new Options()
+      options.set("warehouse", tempDBDir.toString)
+      val catalogContext = CatalogContext.create(options)
+      val uriReaderFactory = new UriReaderFactory(catalogContext)
+      val blob = 
Blob.fromDescriptor(uriReaderFactory.create(newBlobDescriptor.uri), 
blobDescriptor)
+      assert(util.Arrays.equals(blobData, blob.toData))
+
+      sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='false')")
+      checkAnswer(
+        sql("SELECT id, name, content, _ROW_ID, _SEQUENCE_NUMBER FROM t WHERE 
id = 1"),
+        Seq(Row("1", "paimon", blobData, 0, 1))
+      )
+    }
+  }
+
   private val HEX_ARRAY = "0123456789ABCDEF".toCharArray
 
   def bytesToHex(bytes: Array[Byte]): String = {

Reply via email to