This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new aa14aae011 Spark 4.0: Support recursive delegate unwrapping to find 
ExtendedParser in parser chains (#13625)
aa14aae011 is described below

commit aa14aae0111d757f8cded70b87c1d58da3fe272c
Author: majian <[email protected]>
AuthorDate: Sun Nov 2 07:37:22 2025 +0800

    Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser in 
parser chains (#13625)
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
    
    * Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser 
in parser chains
---
 .../apache/iceberg/spark/TestExtendedParser.java   | 231 +++++++++++++++++++++
 .../org/apache/iceberg/spark/ExtendedParser.java   |  45 +++-
 2 files changed, 273 insertions(+), 3 deletions(-)

diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/TestExtendedParser.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/TestExtendedParser.java
new file mode 100644
index 0000000000..bfcb5af235
--- /dev/null
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/TestExtendedParser.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.expressions.Term;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.parser.AbstractSqlParser;
+import org.apache.spark.sql.catalyst.parser.AstBuilder;
+import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import 
org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestExtendedParser {
+
+  private static SparkSession spark;
+  private static final String SQL_PARSER_FIELD = "sqlParser";
+  private ParserInterface originalParser;
+
+  @BeforeAll
+  public static void before() {
+    spark = 
SparkSession.builder().master("local").appName("TestExtendedParser").getOrCreate();
+  }
+
+  @AfterAll
+  public static void after() {
+    if (spark != null) {
+      spark.stop();
+    }
+  }
+
+  @BeforeEach
+  public void saveOriginalParser() throws Exception {
+    Class<?> clazz = spark.sessionState().getClass();
+    Field parserField = null;
+    while (clazz != null && parserField == null) {
+      try {
+        parserField = clazz.getDeclaredField(SQL_PARSER_FIELD);
+      } catch (NoSuchFieldException e) {
+        clazz = clazz.getSuperclass();
+      }
+    }
+    parserField.setAccessible(true);
+    originalParser = (ParserInterface) parserField.get(spark.sessionState());
+  }
+
+  @AfterEach
+  public void restoreOriginalParser() throws Exception {
+    setSessionStateParser(spark.sessionState(), originalParser);
+  }
+
+  /**
+   * Tests that the Iceberg extended SQL parser can correctly parse a sort 
order string and return
+   * the expected RawOrderField.
+   *
+   * @throws Exception if reflection access fails
+   */
+  @Test
+  public void testParseSortOrderWithRealIcebergExtendedParser() throws 
Exception {
+    ParserInterface origParser = null;
+    Class<?> clazz = spark.sessionState().getClass();
+    while (clazz != null && origParser == null) {
+      try {
+        Field parserField = clazz.getDeclaredField(SQL_PARSER_FIELD);
+        parserField.setAccessible(true);
+        origParser = (ParserInterface) parserField.get(spark.sessionState());
+      } catch (NoSuchFieldException e) {
+        clazz = clazz.getSuperclass();
+      }
+    }
+    assertThat(origParser).isNotNull();
+
+    IcebergSparkSqlExtensionsParser icebergParser = new 
IcebergSparkSqlExtensionsParser(origParser);
+
+    setSessionStateParser(spark.sessionState(), icebergParser);
+
+    List<ExtendedParser.RawOrderField> fields =
+        ExtendedParser.parseSortOrder(spark, "id ASC NULLS FIRST");
+
+    assertThat(fields).isNotEmpty();
+    ExtendedParser.RawOrderField first = fields.get(0);
+    assertThat(first.direction()).isEqualTo(SortDirection.ASC);
+    assertThat(first.nullOrder()).isEqualTo(NullOrder.NULLS_FIRST);
+  }
+
+  /**
+   * Tests that parseSortOrder can find and use an ExtendedParser that is 
wrapped inside another
+   * ParserInterface implementation.
+   *
+   * @throws Exception if reflection access fails
+   */
+  @Test
+  public void testParseSortOrderFindsNestedExtendedParser() throws Exception {
+    ExtendedParser icebergParser = mock(ExtendedParser.class);
+
+    ExtendedParser.RawOrderField field =
+        new ExtendedParser.RawOrderField(
+            mock(Term.class), SortDirection.ASC, NullOrder.NULLS_FIRST);
+    List<ExtendedParser.RawOrderField> expected = 
Collections.singletonList(field);
+
+    when(icebergParser.parseSortOrder("id ASC NULLS 
FIRST")).thenReturn(expected);
+
+    ParserInterface wrapper = new WrapperParser(icebergParser);
+
+    setSessionStateParser(spark.sessionState(), wrapper);
+
+    List<ExtendedParser.RawOrderField> result =
+        ExtendedParser.parseSortOrder(spark, "id ASC NULLS FIRST");
+    assertThat(result).isSameAs(expected);
+
+    verify(icebergParser).parseSortOrder("id ASC NULLS FIRST");
+  }
+
+  /**
+   * Tests that parseSortOrder throws an exception if no ExtendedParser 
instance can be found in the
+   * parser chain.
+   *
+   * @throws Exception if reflection access fails
+   */
+  @Test
+  public void testParseSortOrderThrowsWhenNoExtendedParserFound() throws 
Exception {
+    ParserInterface dummy = mock(ParserInterface.class);
+    setSessionStateParser(spark.sessionState(), dummy);
+
+    assertThatThrownBy(() -> ExtendedParser.parseSortOrder(spark, "id ASC"))
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("Iceberg ExtendedParser");
+  }
+
+  /**
+   * Tests that parseSortOrder can find an ExtendedParser in a parent class 
field of the parser.
+   *
+   * @throws Exception if reflection access fails
+   */
+  @Test
+  public void testParseSortOrderFindsExtendedParserInParentClassField() throws 
Exception {
+    ExtendedParser icebergParser = mock(ExtendedParser.class);
+    ExtendedParser.RawOrderField field =
+        new ExtendedParser.RawOrderField(
+            mock(Term.class), SortDirection.ASC, NullOrder.NULLS_FIRST);
+    List<ExtendedParser.RawOrderField> expected = 
Collections.singletonList(field);
+    when(icebergParser.parseSortOrder("id ASC NULLS 
FIRST")).thenReturn(expected);
+    ParserInterface parser = new GrandChildParser(icebergParser);
+    setSessionStateParser(spark.sessionState(), parser);
+
+    List<ExtendedParser.RawOrderField> result =
+        ExtendedParser.parseSortOrder(spark, "id ASC NULLS FIRST");
+    assertThat(result).isSameAs(expected);
+    verify(icebergParser).parseSortOrder("id ASC NULLS FIRST");
+  }
+
+  private static void setSessionStateParser(Object sessionState, 
ParserInterface parser)
+      throws Exception {
+    Class<?> clazz = sessionState.getClass();
+    Field targetField = null;
+    while (clazz != null && targetField == null) {
+      try {
+        targetField = clazz.getDeclaredField(SQL_PARSER_FIELD);
+      } catch (NoSuchFieldException e) {
+        clazz = clazz.getSuperclass();
+      }
+    }
+    if (targetField == null) {
+      throw new IllegalStateException(
+          "No suitable sqlParser field found in sessionState class 
hierarchy!");
+    }
+    targetField.setAccessible(true);
+    targetField.set(sessionState, parser);
+  }
+
+  private static class WrapperParser extends AbstractSqlParser {
+    private final ParserInterface delegate;
+    private String name;
+
+    WrapperParser(ParserInterface delegate) {
+      this.delegate = delegate;
+      this.name = "delegate";
+    }
+
+    public ParserInterface getDelegate() {
+      return delegate;
+    }
+
+    @Override
+    public AstBuilder astBuilder() {
+      return null;
+    }
+  }
+
+  private static class ChildParser extends WrapperParser {
+    ChildParser(ParserInterface parent) {
+      super(parent);
+    }
+  }
+
+  private static class GrandChildParser extends ChildParser {
+    GrandChildParser(ParserInterface parent) {
+      super(parent);
+    }
+  }
+}
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ExtendedParser.java 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ExtendedParser.java
index 19b3dd8f49..d852dc96a3 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ExtendedParser.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ExtendedParser.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark;
 
+import java.lang.reflect.Field;
 import java.util.List;
 import org.apache.iceberg.NullOrder;
 import org.apache.iceberg.SortDirection;
@@ -52,10 +53,10 @@ public interface ExtendedParser extends ParserInterface {
   }
 
   static List<RawOrderField> parseSortOrder(SparkSession spark, String 
orderString) {
-    if (spark.sessionState().sqlParser() instanceof ExtendedParser) {
-      ExtendedParser parser = (ExtendedParser) 
spark.sessionState().sqlParser();
+    ExtendedParser extParser = findParser(spark.sessionState().sqlParser(), 
ExtendedParser.class);
+    if (extParser != null) {
       try {
-        return parser.parseSortOrder(orderString);
+        return extParser.parseSortOrder(orderString);
       } catch (AnalysisException e) {
         throw new IllegalArgumentException(
             String.format("Unable to parse sortOrder: %s", orderString), e);
@@ -66,5 +67,43 @@ public interface ExtendedParser extends ParserInterface {
     }
   }
 
+  private static <T> T findParser(ParserInterface parser, Class<T> clazz) {
+    ParserInterface current = parser;
+    while (current != null) {
+      if (clazz.isInstance(current)) {
+        return clazz.cast(current);
+      }
+
+      ParserInterface next = getNextDelegateParser(current);
+      if (next == null) {
+        break;
+      }
+
+      current = next;
+    }
+
+    return null;
+  }
+
+  private static ParserInterface getNextDelegateParser(ParserInterface parser) 
{
+    try {
+      Class<?> clazz = parser.getClass();
+      while (clazz != null) {
+        for (Field field : clazz.getDeclaredFields()) {
+          field.setAccessible(true);
+          Object value = field.get(parser);
+          if (value instanceof ParserInterface && value != parser) {
+            return (ParserInterface) value;
+          }
+        }
+        clazz = clazz.getSuperclass();
+      }
+    } catch (Exception e) {
+      // ignore
+    }
+
+    return null;
+  }
+
   List<RawOrderField> parseSortOrder(String orderString) throws 
AnalysisException;
 }

Reply via email to