Hi, Leonard Xu: 我使用的 sql 如下,
> SELECT TUMBLE_START(rowtime, INTERVAL '30' SECOND) AS ts, fruit, > COUNT(`fruit`) AS `cnt` > FROM mysource, UNNEST(mysource.parsedResponse) AS A(fruit) > GROUP BY TUMBLE(rowtime, INTERVAL '30' SECOND), fruit 从调试日志来看,应该是一开始就挂掉了,我贴一下相关的日志 INFO - Initializing heap keyed state backend with stream factory. INFO - Source: Custom Source -> Timestamps/Watermarks -> from: > (parsedResponse, rowtime) -> correlate: > table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0 > -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1) > (d8c5f92b850811595dbdc130c04f9e58) switched from RUNNING to FAILED. > java.lang.Exception: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > at > org.apache.flink.streaming.connectors.CommonConsumer.run(CommonConsumer.java:49) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) > Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be > cast to [Ljava.lang.String; > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) > ... 10 more > 另外,如果我把string 数组的类型从 BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO 改为 ObjectArrayTypeInfo.getInfoFor(Types.STRING), 即schema 从 > root > |-- parsedResponse: LEGACY(BasicArrayTypeInfo<String>) > |-- rowtime: TIMESTAMP(3) *ROWTIME* > 变为 > root > |-- parsedResponse: ARRAY<STRING> > |-- rowtime: TIMESTAMP(3) *ROWTIME* > 也仍然会发生相同的错误,但日志执行有些不同 > INFO - Source: Custom Source -> Timestamps/Watermarks -> from: > (parsedResponse, rowtime) -> correlate: > table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0 > -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1) > (36b79032354b9e9ab70a30d98b1de903) switched from RUNNING to FAILED. > java.lang.Exception: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > at > org.apache.flink.streaming.connectors.CommonConsumer.run(CommonConsumer.java:49) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > ... 10 more > Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be > cast to [Ljava.lang.String; > at DataStreamSourceConversion$5.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > ... 16 more > 我尝试使用 JsonRowSchemaConverter对 schema 进行转换,得到的schema和上一封邮件里面是一致的,即: > Row(parsedResponse: BasicArrayTypeInfo<String>, timestamp: Timestamp) > 所以是我的操作在哪里出现了问题呢? 感谢您的回复! 祝好! Leonard Xu <xbjt...@gmail.com> 于2020年7月7日周二 下午5:48写道: > Hi, > > 方便把 SQL 也贴下吗?看起来像个bug。 > > 祝好, > Leonard Xu > >