Dian Fu created FLINK-31905: ------------------------------- Summary: Exception thrown when accessing nested field of the result of Python UDF with complex type Key: FLINK-31905 URL: https://issues.apache.org/jira/browse/FLINK-31905 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu
For the following job: {code} import logging, sys from pyflink.common import Row from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import Schema, DataTypes, TableDescriptor, StreamTableEnvironment from pyflink.table.expressions import col, row from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf logging.basicConfig(stream=sys.stdout, level=logging.ERROR, format="%(message)s") class EmitLastState(AggregateFunction): """ Aggregator that emits the latest state for the purpose of enabling parallelism on CDC tables. """ def create_accumulator(self) -> ACC: return Row(None, None) def accumulate(self, accumulator: ACC, *args): key, obj = args if (accumulator[0] is None) or (key > accumulator[0]): accumulator[0] = key accumulator[1] = obj def retract(self, accumulator: ACC, *args): pass def get_value(self, accumulator: ACC) -> T: return accumulator[1] some_complex_inner_type = DataTypes.ROW( [ DataTypes.FIELD("f0", DataTypes.STRING()), DataTypes.FIELD("f1", DataTypes.STRING()) ] ) some_complex_type = DataTypes.ROW( [ DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type)) for k in ("f0", "f1", "f2") ] + [ DataTypes.FIELD("f3", DataTypes.DATE()), DataTypes.FIELD("f4", DataTypes.VARCHAR(32)), DataTypes.FIELD("f5", DataTypes.VARCHAR(2)), ] ) @udf(input_types=DataTypes.STRING(), result_type=some_complex_type) def complex_udf(s): return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None) if __name__ == "__main__": env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) table_env.get_config().set('pipeline.classpaths', 'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar') # Create schema _schema = { "p_key": DataTypes.INT(False), "modified_id": DataTypes.INT(False), "content": DataTypes.STRING() } schema = Schema.new_builder().from_fields( *zip(*[(k, v) for k, v in _schema.items()]) ).\ primary_key("p_key").\ build() # Create table descriptor descriptor = TableDescriptor.for_connector("postgres-cdc").\ option("hostname", "host.docker.internal").\ option("port", "5432").\ option("database-name", "flink_issue").\ option("username", "root").\ option("password", "root").\ option("debezium.plugin.name", "pgoutput").\ option("schema-name", "flink_schema").\ option("table-name", "flink_table").\ option("slot.name", "flink_slot").\ schema(schema).\ build() table_env.create_temporary_table("flink_table", descriptor) # Create changelog stream stream = table_env.from_path("flink_table")\ # Define UDAF accumulator_type = DataTypes.ROW( [ DataTypes.FIELD("f0", DataTypes.INT(False)), DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in _schema.items()])), ] ) result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in _schema.items()]) emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type, result_type=result_type) # Emit last state based on modified_id to enable parallel processing stream = stream.\ group_by(col("p_key")).\ select( col("p_key"), emit_last(col("modified_id"),row(*(col(k) for k in _schema.keys())).cast(result_type)).alias("tmp_obj") ) # Select the elements of the objects stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in _schema.keys())) # We apply a UDF which parses the xml and returns a complex nested structure stream = stream.select(col("p_key"), complex_udf(col("content")).alias("nested_obj")) # We select an element from the nested structure in order to flatten it # The next line is the line causing issues, commenting the next line will make the pipeline work stream = stream.select(col("p_key"), col("nested_obj").get("f0")) # Interestingly, the below part does work... # stream = stream.select(col("nested_obj").get("f0")) table_env.to_changelog_stream(stream).print() # Execute env.execute_async() {code} {code} py4j.protocol.Py4JJavaError: An error occurred while calling o8.toChangelogStream. : java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1 at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source) at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source) at java.base/java.util.Objects.checkIndex(Unknown Source) at java.base/java.util.ArrayList.get(Unknown Source) at org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975) at org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924) at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75) at org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198) at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904) at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887) at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110) at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889) at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110) at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889) at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887) at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) at org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295) at org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206) at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224) at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193) at org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.immutable.Range.foreach(Range.scala:155) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Unknown Source) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)