This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 030e69db4c [core] Fix blob file read with partition (#6423)
030e69db4c is described below
commit 030e69db4c0ba7385363339cbfd47c8f36a50365
Author: YeJunHao <[email protected]>
AuthorDate: Fri Oct 17 17:25:13 2025 +0800
[core] Fix blob file read with partition (#6423)
---
.../java/org/apache/paimon/schema/TableSchema.java | 8 ++++-
.../org/apache/paimon/spark/sql/BlobTestBase.scala | 38 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 1 deletion(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
index b4baf20cb1..fb2d16c7ed 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -300,7 +300,13 @@ public class TableSchema implements Serializable {
private List<DataField> projectedDataFields(List<String>
projectedFieldNames) {
List<String> fieldNames = fieldNames();
return projectedFieldNames.stream()
- .map(k -> fields.get(fieldNames.indexOf(k)))
+ .map(
+ k -> {
+ return !fieldNames.contains(k)
+ ? null
+ : fields.get(fieldNames.indexOf(k));
+ })
+ .filter(Objects::nonNull)
.collect(Collectors.toList());
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 9f7b3a8b5c..4204ab79e8 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -89,6 +89,44 @@ class BlobTestBase extends PaimonSparkTestBase {
}
}
+ test("Blob: test write blob descriptor with partition") {
+ withTable("t") {
+ val blobData = new Array[Byte](1024 * 1024)
+ RANDOM.nextBytes(blobData)
+ val fileIO = new LocalFileIO
+ val uri = "file://" + tempDBDir.toString + "/external_blob"
+ try {
+ val outputStream = fileIO.newOutputStream(new Path(uri), true)
+ try outputStream.write(blobData)
+ finally if (outputStream != null) outputStream.close()
+ }
+
+ val blobDescriptor = new BlobDescriptor(uri, 0, blobData.length)
+ sql(
+ "CREATE TABLE IF NOT EXISTS t (\n" + "id STRING,\n" + "name STRING,\n"
+ "file_size STRING,\n" + "crc64 STRING,\n" + "modified_time STRING,\n" +
"content BINARY\n" + ") \n" +
+ "PARTITIONED BY (ds STRING, batch STRING) \n" +
+ "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time'
= '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' =
'true','blob-field' = 'content','blob-as-descriptor' = 'true')")
+ sql(
+ "INSERT OVERWRITE TABLE t\nPARTITION(ds= '1017',batch = 'test') VALUES
\n('1','paimon','1024','12345678','20241017',X'" + bytesToHex(
+ blobDescriptor.serialize()) + "')")
+ val newDescriptorBytes =
+ sql("SELECT content FROM t WHERE id =
'1'").collect()(0).get(0).asInstanceOf[Array[Byte]]
+ val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes)
+ val options = new Options()
+ options.set("warehouse", tempDBDir.toString)
+ val catalogContext = CatalogContext.create(options)
+ val uriReaderFactory = new UriReaderFactory(catalogContext)
+ val blob =
Blob.fromDescriptor(uriReaderFactory.create(newBlobDescriptor.uri),
blobDescriptor)
+ assert(util.Arrays.equals(blobData, blob.toData))
+
+ sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='false')")
+ checkAnswer(
+ sql("SELECT id, name, content, _ROW_ID, _SEQUENCE_NUMBER FROM t WHERE
id = 1"),
+ Seq(Row("1", "paimon", blobData, 0, 1))
+ )
+ }
+ }
+
private val HEX_ARRAY = "0123456789ABCDEF".toCharArray
def bytesToHex(bytes: Array[Byte]): String = {