This is an automated email from the ASF dual-hosted git repository.
kunwp1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new ed2f7758f7 fix: preserve ANY type through schema round-trip (#4995)
ed2f7758f7 is described below
commit ed2f7758f73f63c31694f023952f3832b04ce95b
Author: Matthew B. <[email protected]>
AuthorDate: Tue May 12 20:28:05 2026 -0700
fix: preserve ANY type through schema round-trip (#4995)
### What changes were proposed in this PR?
ArrowUtils.fromTexeraSchema now tags ANY attributes with texera_type=ANY
metadata on the Arrow field, and toTexeraSchema reads that tag back.
This mirrors the existing LARGE_BINARY mechanism. Without it, ANY
round-trips
silently became STRING because both types share the same Arrow
representation (Utf8).
### Any related issues, documentation, or discussions?
Closes: #4762
### How was this PR tested?
Updated ArrowUtilsSpec (in common/workflow-core): replaced the test that
pinned the bug ("lose the ANY distinction") with one that asserts ANY is
preserved through a round-trip, and added a test that the
texera_type=ANY
metadata is attached only to ANY fields. Ran both WorkflowCore (27/27)
and WorkflowOperator (14/14) ArrowUtilsSpec suites — all pass.
### Was this PR authored or co-authored using generative AI tooling?
Co-Authored with Claude Opus 4.7 in compliance with ASF
---------
Co-authored-by: Kunwoo (Chris) <[email protected]>
---
.../org/apache/texera/amber/util/ArrowUtils.scala | 27 +++++++++++++++-------
.../apache/texera/amber/util/ArrowUtilsSpec.scala | 22 ++++++++++++++----
2 files changed, 36 insertions(+), 13 deletions(-)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
index a6f22085b6..c4f649e719 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
@@ -45,6 +45,7 @@ import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.language.implicitConversions
object ArrowUtils extends LazyLogging {
+ private val TexeraTypeMetadataKey = "texera_type"
// Create a single allocator for the entire utility
private val allocator: BufferAllocator = new RootAllocator()
@@ -94,7 +95,8 @@ object ArrowUtils extends LazyLogging {
/**
* Converts an Arrow Schema into Texera Schema.
- * Checks field metadata to detect LARGE_BINARY types.
+ * Checks field metadata to recover types that share an Arrow representation
+ * (LARGE_BINARY and ANY both ride on Utf8).
*
* @param arrowSchema The Arrow Schema to be converted.
* @return A Texera Schema.
@@ -102,11 +104,14 @@ object ArrowUtils extends LazyLogging {
def toTexeraSchema(arrowSchema: org.apache.arrow.vector.types.pojo.Schema):
Schema =
Schema(
arrowSchema.getFields.asScala.map { field =>
- val isLargeBinary = Option(field.getMetadata)
- .exists(m => m.containsKey("texera_type") && m.get("texera_type") ==
"LARGE_BINARY")
+ val taggedType = Option(field.getMetadata)
+ .flatMap(m => Option(m.get(TexeraTypeMetadataKey)))
+ .collect {
+ case "LARGE_BINARY" => AttributeType.LARGE_BINARY
+ case "ANY" => AttributeType.ANY
+ }
- val attributeType =
- if (isLargeBinary) AttributeType.LARGE_BINARY else
toAttributeType(field.getType)
+ val attributeType =
taggedType.getOrElse(toAttributeType(field.getType))
new Attribute(field.getName, attributeType)
}.toList
)
@@ -232,16 +237,22 @@ object ArrowUtils extends LazyLogging {
/**
* Converts an Amber schema into Arrow schema.
- * Stores AttributeType in field metadata to preserve LARGE_BINARY type
information.
+ * Stores AttributeType in field metadata to preserve LARGE_BINARY and ANY,
+ * which both collapse onto Utf8 in Arrow.
*
* @param schema The Texera Schema.
* @return An Arrow Schema.
*/
def fromTexeraSchema(schema: Schema):
org.apache.arrow.vector.types.pojo.Schema = {
val arrowFields = schema.getAttributes.map { attribute =>
- val metadata = if (attribute.getType == AttributeType.LARGE_BINARY) {
+ val metadataTag = attribute.getType match {
+ case AttributeType.LARGE_BINARY => "LARGE_BINARY"
+ case AttributeType.ANY => "ANY"
+ case _ => null
+ }
+ val metadata = if (metadataTag != null) {
val map = new util.HashMap[String, String]()
- map.put("texera_type", "LARGE_BINARY")
+ map.put(TexeraTypeMetadataKey, metadataTag)
map
} else null
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
index 69323acbba..212be040b1 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
@@ -258,14 +258,26 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers {
)
}
- it should "lose the ANY distinction (round-trips as STRING)" in {
- // Pin: ANY fromAttributeType produces Utf8 with no metadata.
toAttributeType
- // then can only see Utf8, so the recovered type is STRING. Documenting
this
- // information loss so a future fix that round-trips ANY can break the
spec.
+ it should "preserve ANY through the metadata-based path" in {
val original = Schema(List(new Attribute("v", AttributeType.ANY)))
val recovered =
ArrowUtils.toTexeraSchema(ArrowUtils.fromTexeraSchema(original))
recovered.getAttributes.toList.map(a => (a.getName, a.getType)) shouldBe
List(
- ("v", AttributeType.STRING)
+ ("v", AttributeType.ANY)
)
}
+
+ it should "attach texera_type=ANY metadata to ANY fields and only those" in {
+ val schema = Schema(
+ List(
+ new Attribute("v", AttributeType.ANY),
+ new Attribute("name", AttributeType.STRING)
+ )
+ )
+ val arrow = ArrowUtils.fromTexeraSchema(schema)
+ val fields = arrow.getFields.asScala.toList
+ val any = fields.find(_.getName == "v").get
+ val name = fields.find(_.getName == "name").get
+ any.getMetadata.get("texera_type") shouldBe "ANY"
+
Option(name.getMetadata).map(_.containsKey("texera_type")).getOrElse(false)
shouldBe false
+ }
}