如果单独执行这个function 的话是没有问题的 select Test().a 是没有问题的
Peihui He <peihu...@gmail.com> 于2021年3月12日周五 下午6:30写道: > hi, all > > 定义一个 ScalarFunction > class Test extends ScalarFunction{ > @DataTypeHint("ROW<a String, b String, c String>") > def eval(): Row ={ > Row.of("a", "b", "c") > } > } > > 当执行下面语句的时候 > select Test().a from taba1 > 会报下面的错误: > > java.io.IOException: Fail to run stream sql job at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172) > at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105) > at > org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:494) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:257) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:111) > at > org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47) > at > org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744) > at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at > org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) > at > org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: scala.MatchError: > Test() (of class org.apache.calcite.rex.RexCall) at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) > at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1321) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1276) > at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:161) > ... 16 more > > 应该是优化的时候出了问题,是bug不? > > 现在改在解决这个问题呢? > > Best Regards. >