如果单独执行这个function 的话是没有问题的

select Test().a 是没有问题的


Peihui He <peihu...@gmail.com> 于2021年3月12日周五 下午6:30写道:

> hi, all
>
> 定义一个 ScalarFunction
> class Test extends ScalarFunction{
>   @DataTypeHint("ROW<a String, b String, c String>")
>   def eval(): Row ={
>     Row.of("a", "b", "c")
>   }
> }
>
> 当执行下面语句的时候
> select Test().a from taba1
> 会报下面的错误:
>
> java.io.IOException: Fail to run stream sql job at
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172)
> at
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105)
> at
> org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
> at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:494)
> at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:257)
> at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
> at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:111)
> at
> org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
> at
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
> at
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
> at
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
> at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
> at
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by: scala.MatchError:
> Test() (of class org.apache.calcite.rex.RexCall) at
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)
> at
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283)
> at
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269)
> at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112)
> at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111)
> at
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)
> at
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155)
> at
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
> at
> org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486)
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1321)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1276)
> at
> org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:161)
> ... 16 more
>
> 应该是优化的时候出了问题,是bug不?
>
> 现在改在解决这个问题呢?
>
> Best Regards.
>

回复