flink 1.11 application模式 使用 k8s时如何指定拉起的taskmanager数目

2021-06-04 文章 Jun Zou
Hi,all:
  我使用flink 1.11.2 的application模式在k8s上提交作业,作业申请的tm数目和期望的不一致。

作业调用DataStream接口注册kafka source和HDFS
sink,中间操作使用sql,sql逻辑是map-only,kafka的分区数目为4
首先,我在yarn上提交同样类型的作业,指定如下参数:

> taskmanager.numberOfTaskSlots:1
>
 parallelism.default:4

产生了4个taskmanager

而在k8s上配置了如下参数:

> taskmanager.numberOfTaskSlots:1
>
parallelism.default:4

kubernetes.taskmanager.cpu:1

却只申请了一个taskmanager。

另外,我使用TopSpeedWindowing这个example,在k8s上提交jar作业能拉起正确的taskmanager数目


createTemporaryView接口注册table时,fieldname支持中横线(-)

2021-05-26 文章 Jun Zou
Hi,all:
我使用flink1.11.2进行作业开发,由于要兼容内部历史代码,需要把source手动注册成一张表,调用为:

> tableEnv.createTemporaryView(tableSource.getName, source, fields.toArray:
> _*)
>
其中,tableEnv为 StreamTableEnvironment类型,source是 DataStream[Row] 类型,代表source
connector生成的算子,fields是 由处理过的source table的 filed name 转换成的 Expression,将filed
name转换成expression 使用 *ExpressionParser.parseExpression* 这个方法

正常情况下,都能注册成功。
但是,当field name带中横线,如 source中一个字段名称为
“X-Oem”时,经过 ExpressionParser.parseExpression 会被解析为 “minus(X, Model)”
而非预期的“X-Oem”,导致注册成的表与DML语句中操作的字段名不一致报错。

有什么方法能够处理这种情况么?


Re: 嵌套 json 中string 数组的解析异常

2020-07-07 文章 Jun Zou
Hi,
感谢您的指导!

祝好!

Leonard Xu  于2020年7月7日周二 下午9:49写道:

> Hi,
>
> 看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的
> type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。
>
> 祝好,
> Leonard Xu
> [1]https://issues.apache.org/jira/browse/FLINK-16622 <
> https://issues.apache.org/jira/browse/FLINK-16622?focusedCommentId=17061790=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17061790
> >
>
>


Re: 嵌套 json 中string 数组的解析异常

2020-07-07 文章 Jun Zou
Hi, Leonard Xu:

我使用的 sql 如下,

> SELECT TUMBLE_START(rowtime, INTERVAL '30' SECOND) AS ts, fruit,
> COUNT(`fruit`) AS `cnt`
> FROM mysource, UNNEST(mysource.parsedResponse) AS A(fruit)
> GROUP BY TUMBLE(rowtime, INTERVAL '30' SECOND), fruit


从调试日志来看,应该是一开始就挂掉了,我贴一下相关的日志

INFO - Initializing heap keyed state backend with stream factory.

INFO - Source: Custom Source -> Timestamps/Watermarks -> from:
> (parsedResponse, rowtime) -> correlate:
> table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0
> -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1)
> (d8c5f92b850811595dbdc130c04f9e58) switched from RUNNING to FAILED.
> java.lang.Exception:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.CommonConsumer.run(CommonConsumer.java:49)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
> Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be
> cast to [Ljava.lang.String;
> at
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
> ... 10 more
>

另外,如果我把string 数组的类型从  BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
改为 ObjectArrayTypeInfo.getInfoFor(Types.STRING), 即schema 从

> root
>  |-- parsedResponse: LEGACY(BasicArrayTypeInfo)
>  |-- rowtime: TIMESTAMP(3) *ROWTIME*
>
变为

> root
>  |-- parsedResponse: ARRAY
>  |-- rowtime: TIMESTAMP(3) *ROWTIME*
>

也仍然会发生相同的错误,但日志执行有些不同

> INFO - Source: Custom Source -> Timestamps/Watermarks -> from:
> (parsedResponse, rowtime) -> correlate:
> table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0
> -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1)
> (36b79032354b9e9ab70a30d98b1de903) switched from RUNNING to FAILED.
> java.lang.Exception:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> 

嵌套 json 中string 数组的解析异常

2020-07-07 文章 Jun Zou
Hi all:
我使用 flink 1.9 处理嵌套 json, 它嵌套了一个string数组,构造出的 table schema结构为:
Row(parsedResponse: BasicArrayTypeInfo, timestamp: Long)
执行作业后会发生报错如下,出现 object 类型和string 类型的转换错误
Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast
to [Ljava.lang.String;
at
org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)

大佬们知道该怎么修改么?

我的json 的结构如下:
{"parsedResponse":["apple", "banana", "orange"], "timestamp": "1522253345"}
 P.S:
如果把 string 数组改为 long 数组或者 double 数组执行对应的操作可以正确运行,目前来看只有 string 数组出现问题。


flink 1.9 中 StreamTableEnvironment 注册 registerDataStream处理嵌套别名

2020-07-03 文章 Jun Zou
Hi,
我在使用flink 1.9版本的 StreamTableEnvironment 注册 table 时,想指定一个嵌套字段的 cloumns
alianame,
例如:
String fieldExprsStr = "modbus.parsedResponse,timestamp";
tableEnv.registerDataStream(src.getName(), srcStream, fieldExprsStr);
在对 modbus.parsedResponse 进行校验的时候
抛出了如下错误:
org.apache.flink.table.api.ValidationException: Field reference expression
or alias on field expression expected.
at
org.apache.flink.table.typeutils.FieldInfoUtils$IndexedExprToFieldInfo.defaultMethod(FieldInfoUtils.java:543)
at
org.apache.flink.table.typeutils.FieldInfoUtils$IndexedExprToFieldInfo.defaultMethod(FieldInfoUtils.java:470)
at
org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor.visit(ApiExpressionDefaultVisitor.java:92)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at
org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:67)

请问是否有方法来指定这种cloumns 别名呢?