Hi, Leonard Xu:
我使用的 sql 如下,
> SELECT TUMBLE_START(rowtime, INTERVAL '30' SECOND) AS ts, fruit,
> COUNT(`fruit`) AS `cnt`
> FROM mysource, UNNEST(mysource.parsedResponse) AS A(fruit)
> GROUP BY TUMBLE(rowtime, INTERVAL '30' SECOND), fruit
从调试日志来看,应该是一开始就挂掉了,我贴一下相关的日志
INFO - Initializing heap keyed state backend with stream factory.
INFO - Source: Custom Source -> Timestamps/Watermarks -> from:
> (parsedResponse, rowtime) -> correlate:
> table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0
> -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1)
> (d8c5f92b850811595dbdc130c04f9e58) switched from RUNNING to FAILED.
> java.lang.Exception:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.CommonConsumer.run(CommonConsumer.java:49)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
> Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be
> cast to [Ljava.lang.String;
> at
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
> ... 10 more
>
另外,如果我把string 数组的类型从 BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
改为 ObjectArrayTypeInfo.getInfoFor(Types.STRING), 即schema 从
> root
> |-- parsedResponse: LEGACY(BasicArrayTypeInfo)
> |-- rowtime: TIMESTAMP(3) *ROWTIME*
>
变为
> root
> |-- parsedResponse: ARRAY
> |-- rowtime: TIMESTAMP(3) *ROWTIME*
>
也仍然会发生相同的错误,但日志执行有些不同
> INFO - Source: Custom Source -> Timestamps/Watermarks -> from:
> (parsedResponse, rowtime) -> correlate:
> table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0
> -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1)
> (36b79032354b9e9ab70a30d98b1de903) switched from RUNNING to FAILED.
> java.lang.Exception:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
>