This is an automated email from the ASF dual-hosted git repository.

kunwp1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new ed2f7758f7 fix: preserve ANY type through schema round-trip (#4995)
ed2f7758f7 is described below

commit ed2f7758f73f63c31694f023952f3832b04ce95b
Author: Matthew B. <[email protected]>
AuthorDate: Tue May 12 20:28:05 2026 -0700

    fix: preserve ANY type through schema round-trip (#4995)
    
    ### What changes were proposed in this PR?
    ArrowUtils.fromTexeraSchema now tags ANY attributes with texera_type=ANY
    metadata on the Arrow field, and toTexeraSchema reads that tag back.
    This mirrors the existing LARGE_BINARY mechanism. Without it, ANY
    round-trips
    silently became STRING because both types share the same Arrow
    representation (Utf8).
    
    
    ### Any related issues, documentation, or discussions?
    Closes: #4762
    
    
    ### How was this PR tested?
    Updated ArrowUtilsSpec (in common/workflow-core): replaced the test that
    pinned the bug ("lose the ANY distinction") with one that asserts ANY is
    preserved through a round-trip, and added a test that the
    texera_type=ANY
    metadata is attached only to ANY fields. Ran both WorkflowCore (27/27)
    and WorkflowOperator (14/14) ArrowUtilsSpec suites — all pass.
    
    
    ### Was this PR authored or co-authored using generative AI tooling?
    Co-Authored with Claude Opus 4.7 in compliance with ASF
    
    ---------
    
    Co-authored-by: Kunwoo (Chris) <[email protected]>
---
 .../org/apache/texera/amber/util/ArrowUtils.scala  | 27 +++++++++++++++-------
 .../apache/texera/amber/util/ArrowUtilsSpec.scala  | 22 ++++++++++++++----
 2 files changed, 36 insertions(+), 13 deletions(-)

diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
index a6f22085b6..c4f649e719 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
@@ -45,6 +45,7 @@ import scala.jdk.CollectionConverters.CollectionHasAsScala
 import scala.language.implicitConversions
 
 object ArrowUtils extends LazyLogging {
+  private val TexeraTypeMetadataKey = "texera_type"
 
   // Create a single allocator for the entire utility
   private val allocator: BufferAllocator = new RootAllocator()
@@ -94,7 +95,8 @@ object ArrowUtils extends LazyLogging {
 
   /**
     * Converts an Arrow Schema into Texera Schema.
-    * Checks field metadata to detect LARGE_BINARY types.
+    * Checks field metadata to recover types that share an Arrow representation
+    * (LARGE_BINARY and ANY both ride on Utf8).
     *
     * @param arrowSchema The Arrow Schema to be converted.
     * @return A Texera Schema.
@@ -102,11 +104,14 @@ object ArrowUtils extends LazyLogging {
   def toTexeraSchema(arrowSchema: org.apache.arrow.vector.types.pojo.Schema): 
Schema =
     Schema(
       arrowSchema.getFields.asScala.map { field =>
-        val isLargeBinary = Option(field.getMetadata)
-          .exists(m => m.containsKey("texera_type") && m.get("texera_type") == 
"LARGE_BINARY")
+        val taggedType = Option(field.getMetadata)
+          .flatMap(m => Option(m.get(TexeraTypeMetadataKey)))
+          .collect {
+            case "LARGE_BINARY" => AttributeType.LARGE_BINARY
+            case "ANY"          => AttributeType.ANY
+          }
 
-        val attributeType =
-          if (isLargeBinary) AttributeType.LARGE_BINARY else 
toAttributeType(field.getType)
+        val attributeType = 
taggedType.getOrElse(toAttributeType(field.getType))
         new Attribute(field.getName, attributeType)
       }.toList
     )
@@ -232,16 +237,22 @@ object ArrowUtils extends LazyLogging {
 
   /**
     * Converts an Amber schema into Arrow schema.
-    * Stores AttributeType in field metadata to preserve LARGE_BINARY type 
information.
+    * Stores AttributeType in field metadata to preserve LARGE_BINARY and ANY,
+    * which both collapse onto Utf8 in Arrow.
     *
     * @param schema The Texera Schema.
     * @return An Arrow Schema.
     */
   def fromTexeraSchema(schema: Schema): 
org.apache.arrow.vector.types.pojo.Schema = {
     val arrowFields = schema.getAttributes.map { attribute =>
-      val metadata = if (attribute.getType == AttributeType.LARGE_BINARY) {
+      val metadataTag = attribute.getType match {
+        case AttributeType.LARGE_BINARY => "LARGE_BINARY"
+        case AttributeType.ANY          => "ANY"
+        case _                          => null
+      }
+      val metadata = if (metadataTag != null) {
         val map = new util.HashMap[String, String]()
-        map.put("texera_type", "LARGE_BINARY")
+        map.put(TexeraTypeMetadataKey, metadataTag)
         map
       } else null
 
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
index 69323acbba..212be040b1 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
@@ -258,14 +258,26 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers {
     )
   }
 
-  it should "lose the ANY distinction (round-trips as STRING)" in {
-    // Pin: ANY fromAttributeType produces Utf8 with no metadata. 
toAttributeType
-    // then can only see Utf8, so the recovered type is STRING. Documenting 
this
-    // information loss so a future fix that round-trips ANY can break the 
spec.
+  it should "preserve ANY through the metadata-based path" in {
     val original = Schema(List(new Attribute("v", AttributeType.ANY)))
     val recovered = 
ArrowUtils.toTexeraSchema(ArrowUtils.fromTexeraSchema(original))
     recovered.getAttributes.toList.map(a => (a.getName, a.getType)) shouldBe 
List(
-      ("v", AttributeType.STRING)
+      ("v", AttributeType.ANY)
     )
   }
+
+  it should "attach texera_type=ANY metadata to ANY fields and only those" in {
+    val schema = Schema(
+      List(
+        new Attribute("v", AttributeType.ANY),
+        new Attribute("name", AttributeType.STRING)
+      )
+    )
+    val arrow = ArrowUtils.fromTexeraSchema(schema)
+    val fields = arrow.getFields.asScala.toList
+    val any = fields.find(_.getName == "v").get
+    val name = fields.find(_.getName == "name").get
+    any.getMetadata.get("texera_type") shouldBe "ANY"
+    
Option(name.getMetadata).map(_.containsKey("texera_type")).getOrElse(false) 
shouldBe false
+  }
 }

Reply via email to