Re: Re: Pandas UDF处理过的数据sink问题

2020-12-13 文章 guoliubi...@foxmail.com
Hi xingbo,
文档中给的例子udtf需要和join一起使用,但是我现在不需要join,只是单纯的转换结果
如果直接调用了udtf后sink,会提示
Cause: Different number of columns.
Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]

Sink schema:  [buyQtl: BIGINT, aveBuy: INT]


guoliubi...@foxmail.com
 
发件人: Xingbo Huang
发送时间: 2020-12-14 11:38
收件人: user-zh
主题: Re: Re: Pandas UDF处理过的数据sink问题
Hi,
你想要一列变多列的话,你需要使用UDTF了,具体使用方式,你可以参考文档[1]
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions
Best,
Xingbo
 
guoliubi...@foxmail.com  于2020年12月14日周一 上午11:00写道:
 
> 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
> 但现在有另一个问题,根据文档
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
> Vectorized Python aggregate functions takes one or more pandas.Series as
> the inputs and return one scalar value as output.
> Note The return type does not support RowType and MapType for the time
> being.
> udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
> 现在是后面用另一个udf把这个string再做拆分,代码大概如下:
> @udf(result_type=DataTypes.ROW(
> [DataTypes.FIELD('value1', DataTypes.BIGINT()),
>  DataTypes.FIELD('value2', DataTypes.INT())]))
> def flattenStr(inputStr):
> ret_array = [int(x) for x in inputStr.split(',')]
> return Row(ret_array[0], ret_array[1])
> t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table =
> order_table.window(tumble_window) \
> .group_by("w") \
> .select("**调用udaf** as aggValue")
> result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
>
> result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
> at
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
> at
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
>
>
> guoliubi...@foxmail.com
>
> 发件人: Wei Zhong
> 发送时间: 2020-12-14 10:38
> 收件人: user-zh
> 主题: Re: Pandas UDF处理过的数据sink问题
> Hi Lucas,
>
> 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
>
> 你可以尝试将sql语句改成以下形式:
>
> select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
>
> 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
>
> Best,
> Wei
>
> > 在 2020年12月13日,13:13,Lucas  写道:
> >
> > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> >
> > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> > result_type=DataTypes.ROW(
> > [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
> >  DataTypes.FIELD('aveBuy', DataTypes.INT())),
> > func_type='pandas')
> > def orderCalc(code, amount):
> >
> >df = pd.DataFrame({'code': code, 'amount': amount})
> > # pandas 数据处理后输入另一个dataframe output
> > return (output['buyQtl'], output['aveBuy'])
> >
> >
> > 定义了csv的sink如下
> >
> > create table csvSink (
> >buyQtl BIGINT,
> >aveBuy INT
> > ) with (
> >'connector.type' = 'filesystem',
> >'format.type' = 'csv',
> >'connector.path' = 'e:/output'
> > )
> >
> >
> >
> > 然后进行如下的操作:
> >
> > result_table = t_env.sql_query("""
> > select orderCalc(code, amount)
> > from `some_source`
> > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> > """)
> > result_table.execute_insert("csvSink")
> >
> >
> >
> > 在执行程序的时候提示没法入库
> >
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > o98.executeInsert.
> >
> > : org.apache.flink.table.api.ValidationException: Column types of query
> > result and sink for registered table
> > 'default_catalog.default_database.csvSink' do not match.
> >
> > Cause: Different number of columns.
> >
> >
> >
> > Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
> >
> > Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
> >
> >at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> > ception(DynamicSinkUtils.java:304)
> >
> >at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> > ImplicitCast(DynamicSinkUtils.java:134)
> >
> >
> >
> > 是UDF的输出结构不对吗,还是需要调整sink table的结构?
> >
>
>
>


Re: kafka的多分区watermark

2020-12-13 文章 张锴
谢谢你,我想明白了

Shuai Xia  于2020年12月14日周一 下午2:08写道:

>
> Hi,没有太理解你的意思,这个MyType只是说你可以把Kafka的数据反序列化后使用,像SimpleStringSchema默认是String,你可以对他进行解析
>
>
> --
> 发件人:张锴 
> 发送时间:2020年12月14日(星期一) 13:51
> 收件人:user-zh 
> 主 题:kafka的多分区watermark
>
> 在官网看到对于Kafka分区的时间戳定义描述,给出了示例,如下图:
>
> FlinkKafkaConsumer09 kafkaSource = new
> FlinkKafkaConsumer09<>("myTopic", schema,
> props);kafkaSource.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor() {
>
> @Override
> public long extractAscendingTimestamp(MyType element) {
> return element.eventTimestamp();
> }});
> DataStream stream = env.addSource(kafkaSource);
>
> *不太理解这个里面泛型传的是用户定义的case class,还是传*ConsumerRecord,从他里面提取时间戳
>


回复:kafka的多分区watermark

2020-12-13 文章 Shuai Xia
Hi,没有太理解你的意思,这个MyType只是说你可以把Kafka的数据反序列化后使用,像SimpleStringSchema默认是String,你可以对他进行解析


--
发件人:张锴 
发送时间:2020年12月14日(星期一) 13:51
收件人:user-zh 
主 题:kafka的多分区watermark

在官网看到对于Kafka分区的时间戳定义描述,给出了示例,如下图:

FlinkKafkaConsumer09 kafkaSource = new
FlinkKafkaConsumer09<>("myTopic", schema,
props);kafkaSource.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor() {

@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}});
DataStream stream = env.addSource(kafkaSource);

*不太理解这个里面泛型传的是用户定义的case class,还是传*ConsumerRecord,从他里面提取时间戳


kafka的多分区watermark

2020-12-13 文章 张锴
在官网看到对于Kafka分区的时间戳定义描述,给出了示例,如下图:

FlinkKafkaConsumer09 kafkaSource = new
FlinkKafkaConsumer09<>("myTopic", schema,
props);kafkaSource.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor() {

@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}});
DataStream stream = env.addSource(kafkaSource);

*不太理解这个里面泛型传的是用户定义的case class,还是传*ConsumerRecord,从他里面提取时间戳


回复: Re: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

2020-12-13 文章 guoliubi...@foxmail.com
不好意思我没说清楚。
我这边用的是这样的SQL可以运作,你可以参考下。
CREATE TABLE `someTable` (
  eventTime TIMESTAMP(3),
  WATERMARK FOR eventTime AS eventTime
)
eventTime是java的Long类型,包含毫秒,SQL里可以直接转成TIMESTAMP
select someFunc(field)
from `someTable`
group by TUMBLE(eventTime, INTERVAL '1' SECOND)



guoliubi...@foxmail.com
 
发件人: kandy.wang
发送时间: 2020-12-14 11:23
收件人: user-zh
主题: Re:回复: Window aggregate can only be defined over a time attribute column, 
but TIMESTAMP(3) encountered.
hi guoliubin85:
一样的报错:
 
Flink SQL> select mid,code,floor_id,TUMBLE_START(time_local/1000, INTERVAL '1' 
MINUTE) as log_minute,count(1) pv
 
> from lightart_expose
 
> where code is not null and floor_id is not null
 
> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' 
> MINUTE);[ERROR] Could not execute SQL statement. Reason:
 
org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' 
to arguments of type '$TUMBLE(, )'. Supported form(s): 
'$TUMBLE(, )'
 
'$TUMBLE(, , )'
 
 
 
> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' 
> MINUTE);[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' 
to arguments of type '$TUMBLE(, )'. Supported form(s): 
'$TUMBLE(, )'
'$TUMBLE(, , )'
 
在 2020-12-14 10:41:12,"guoliubi...@foxmail.com"  写道:
>TUMBLE第一个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP
>
>
>
>guoliubi...@foxmail.com
> 
>发件人: kandy.wang
>发送时间: 2020-12-14 10:28
>收件人: user-zh
>主题: Window aggregate can only be defined over a time attribute column, but 
>TIMESTAMP(3) encountered.
>[ERROR] Could not execute SQL statement. 
>Reason:org.apache.flink.table.api.TableException: Window aggregate can only be 
>defined over a time attribute column, but TIMESTAMP(3) encountered.
> 
> 
>SQL 如下:
>create temporary view expose as
> 
>select  
> 
>mid
> 
>,time_local
> 
>,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as 
>log_ts
> 
>,proctime
> 
>from hive.temp.kafka_table
> 
>;
>time_local 是bigint
> 
> 
> 
>select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv
> 
>from expose
> 
>group by TUMBLE(log_ts, INTERVAL '1' MINUTE);
> 
> 
>window agg的字段报错,如何解决。


Re: flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-13 文章 silence
flink已经不建议将hadoop的jar放到lib里了

可以通过
export HADOOP_CLASSPATH=`hadoop classpath` 
加载hadoop的依赖
参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink-sql 元数据问题

2020-12-13 文章 占英华



> 在 2020年12月14日,11:43,Rui Li  写道:
> 
> Hi,
> 
> 调用tableEnv.executeSql("create table
> .")以后表就已经创建了,不需要再调用tableEnv.execute。execute方法已经deprecate,建议统一使用executeSql哈
> 
>> On Fri, Dec 11, 2020 at 7:23 PM JasonLee <17610775...@163.com> wrote:
>> 
>> hi
>> Flink SQL 建的表支持用 hive 的 catalog 来管理元数据,是否可以满足你的需求 ?
>> 
>> 
>> 
>> -
>> Best Wishes
>> JasonLee
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>> 
> 
> 
> -- 
> Best regards!
> Rui Li




Re: 关于flink-sql 元数据问题

2020-12-13 文章 Rui Li
Hi,

调用tableEnv.executeSql("create table
.")以后表就已经创建了,不需要再调用tableEnv.execute。execute方法已经deprecate,建议统一使用executeSql哈

On Fri, Dec 11, 2020 at 7:23 PM JasonLee <17610775...@163.com> wrote:

> hi
> Flink SQL 建的表支持用 hive 的 catalog 来管理元数据,是否可以满足你的需求 ?
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


Re: Re: Flink SQL 怎么为每一个任务分配不同的内存配置

2020-12-13 文章 magichuang

各位好,最近也在思考这个问题,我是采用的flink  on  yarn集群部署的,每次通过Per-job模式提交任务时虽然指定了  -ytm  
2048,但是当运行起来之后去看yarn资源会发现可用内存并不是减少2g  而是减少了3g,也就是实际占用会比我给定的内存多1g。
如果每个sql任务单独运行的话,这样会不会造成资源浪费呀?




再用sql语句编写程序时,能不能在一个任务中,在用多个source、transformation、sink情况下,为每个sql单独指定solt数?




祝好~

> -- 原始邮件 --
> 发 件 人:"Kyle Zhang" 
> 发送时间:2020-12-14 09:25:59
> 收 件 人:user-zh@flink.apache.org
> 抄 送:
> 主 题:Re: Flink SQL 怎么为每一个任务分配不同的内存配置
>
> 一个集群跑一个SQL任务怎么样
>
> On Mon, Dec 14, 2020 at 8:42 AM yinghua...@163.com
> wrote:
>
> > Flink 作业在提交时可以通过参数指定JobManager
> > 和TaskManager的内存配置,但是SQL执行时怎么为每一个任务指定其内存配置,是不是都是读同一个flink-conf.yaml中的配置?
> >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/config.html#memory-configuration
> > 中内存的配置都是基于flink-conf.yaml文件来操作的,是全局的配置,没有找到基于SQL任务独立配合内存的?
> >
> >
> >
> > yinghua...@163.com
> >






Re: Re: Pandas UDF处理过的数据sink问题

2020-12-13 文章 Xingbo Huang
Hi,
你想要一列变多列的话,你需要使用UDTF了,具体使用方式,你可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions
Best,
Xingbo

guoliubi...@foxmail.com  于2020年12月14日周一 上午11:00写道:

> 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
> 但现在有另一个问题,根据文档
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
> Vectorized Python aggregate functions takes one or more pandas.Series as
> the inputs and return one scalar value as output.
> Note The return type does not support RowType and MapType for the time
> being.
> udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
> 现在是后面用另一个udf把这个string再做拆分,代码大概如下:
> @udf(result_type=DataTypes.ROW(
> [DataTypes.FIELD('value1', DataTypes.BIGINT()),
>  DataTypes.FIELD('value2', DataTypes.INT())]))
> def flattenStr(inputStr):
> ret_array = [int(x) for x in inputStr.split(',')]
> return Row(ret_array[0], ret_array[1])
> t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table =
> order_table.window(tumble_window) \
> .group_by("w") \
> .select("**调用udaf** as aggValue")
> result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
>
> result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
> at
> org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
> at
> org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)
>
>
> guoliubi...@foxmail.com
>
> 发件人: Wei Zhong
> 发送时间: 2020-12-14 10:38
> 收件人: user-zh
> 主题: Re: Pandas UDF处理过的数据sink问题
> Hi Lucas,
>
> 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
>
> 你可以尝试将sql语句改成以下形式:
>
> select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
>
> 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
>
> Best,
> Wei
>
> > 在 2020年12月13日,13:13,Lucas  写道:
> >
> > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> >
> > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> > result_type=DataTypes.ROW(
> > [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
> >  DataTypes.FIELD('aveBuy', DataTypes.INT())),
> > func_type='pandas')
> > def orderCalc(code, amount):
> >
> >df = pd.DataFrame({'code': code, 'amount': amount})
> > # pandas 数据处理后输入另一个dataframe output
> > return (output['buyQtl'], output['aveBuy'])
> >
> >
> > 定义了csv的sink如下
> >
> > create table csvSink (
> >buyQtl BIGINT,
> >aveBuy INT
> > ) with (
> >'connector.type' = 'filesystem',
> >'format.type' = 'csv',
> >'connector.path' = 'e:/output'
> > )
> >
> >
> >
> > 然后进行如下的操作:
> >
> > result_table = t_env.sql_query("""
> > select orderCalc(code, amount)
> > from `some_source`
> > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> > """)
> > result_table.execute_insert("csvSink")
> >
> >
> >
> > 在执行程序的时候提示没法入库
> >
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > o98.executeInsert.
> >
> > : org.apache.flink.table.api.ValidationException: Column types of query
> > result and sink for registered table
> > 'default_catalog.default_database.csvSink' do not match.
> >
> > Cause: Different number of columns.
> >
> >
> >
> > Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
> >
> > Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
> >
> >at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> > ception(DynamicSinkUtils.java:304)
> >
> >at
> >
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> > ImplicitCast(DynamicSinkUtils.java:134)
> >
> >
> >
> > 是UDF的输出结构不对吗,还是需要调整sink table的结构?
> >
>
>
>


Re:回复: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

2020-12-13 文章 kandy.wang
hi guoliubin85:
一样的报错:

Flink SQL> select mid,code,floor_id,TUMBLE_START(time_local/1000, INTERVAL '1' 
MINUTE) as log_minute,count(1) pv

> from lightart_expose

> where code is not null and floor_id is not null

> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' 
> MINUTE);[ERROR] Could not execute SQL statement. Reason:

org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' 
to arguments of type '$TUMBLE(, )'. Supported form(s): 
'$TUMBLE(, )'

'$TUMBLE(, , )'



> group by mid,code,floor_id,TUMBLE(time_local/1000, INTERVAL '1' 
> MINUTE);[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$TUMBLE' 
to arguments of type '$TUMBLE(, )'. Supported form(s): 
'$TUMBLE(, )'
'$TUMBLE(, , )'

在 2020-12-14 10:41:12,"guoliubi...@foxmail.com"  写道:
>TUMBLE第一个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP
>
>
>
>guoliubi...@foxmail.com
> 
>发件人: kandy.wang
>发送时间: 2020-12-14 10:28
>收件人: user-zh
>主题: Window aggregate can only be defined over a time attribute column, but 
>TIMESTAMP(3) encountered.
>[ERROR] Could not execute SQL statement. 
>Reason:org.apache.flink.table.api.TableException: Window aggregate can only be 
>defined over a time attribute column, but TIMESTAMP(3) encountered.
> 
> 
>SQL 如下:
>create temporary view expose as
> 
>select  
> 
>mid
> 
>,time_local
> 
>,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as 
>log_ts
> 
>,proctime
> 
>from hive.temp.kafka_table
> 
>;
>time_local 是bigint
> 
> 
> 
>select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv
> 
>from expose
> 
>group by TUMBLE(log_ts, INTERVAL '1' MINUTE);
> 
> 
>window agg的字段报错,如何解决。


回复: Re: Pandas UDF处理过的数据sink问题

2020-12-13 文章 guoliubi...@foxmail.com
多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。
但现在有另一个问题,根据文档
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
 
Vectorized Python aggregate functions takes one or more pandas.Series as the 
inputs and return one scalar value as output.
Note The return type does not support RowType and MapType for the time being.
udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。
现在是后面用另一个udf把这个string再做拆分,代码大概如下:
@udf(result_type=DataTypes.ROW(
[DataTypes.FIELD('value1', DataTypes.BIGINT()),
 DataTypes.FIELD('value2', DataTypes.INT())]))
def flattenStr(inputStr):
ret_array = [int(x) for x in inputStr.split(',')]
return Row(ret_array[0], ret_array[1])
t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table = 
order_table.window(tumble_window) \
.group_by("w") \
.select("**调用udaf** as aggValue")
result_table = aggregate_table.select("flattenStr(aggValue) as retValue")
result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at 
org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306)
at 
org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122)


guoliubi...@foxmail.com
 
发件人: Wei Zhong
发送时间: 2020-12-14 10:38
收件人: user-zh
主题: Re: Pandas UDF处理过的数据sink问题
Hi Lucas,
 
是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。
 
你可以尝试将sql语句改成以下形式:
 
select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
from `some_source`
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
 
此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”
 
Best,
Wei
 
> 在 2020年12月13日,13:13,Lucas  写道:
> 
> 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> 
> @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> result_type=DataTypes.ROW(
> [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
>  DataTypes.FIELD('aveBuy', DataTypes.INT())),
> func_type='pandas')
> def orderCalc(code, amount):
> 
>df = pd.DataFrame({'code': code, 'amount': amount})
> # pandas 数据处理后输入另一个dataframe output
> return (output['buyQtl'], output['aveBuy'])
> 
> 
> 定义了csv的sink如下
> 
> create table csvSink (
>buyQtl BIGINT,
>aveBuy INT 
> ) with (
>'connector.type' = 'filesystem',
>'format.type' = 'csv',
>'connector.path' = 'e:/output'
> )
> 
> 
> 
> 然后进行如下的操作:
> 
> result_table = t_env.sql_query("""
> select orderCalc(code, amount)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> """)
> result_table.execute_insert("csvSink")
> 
> 
> 
> 在执行程序的时候提示没法入库
> 
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o98.executeInsert.
> 
> : org.apache.flink.table.api.ValidationException: Column types of query
> result and sink for registered table
> 'default_catalog.default_database.csvSink' do not match.
> 
> Cause: Different number of columns.
> 
> 
> 
> Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
> 
> Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
> 
>at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> ception(DynamicSinkUtils.java:304)
> 
>at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> ImplicitCast(DynamicSinkUtils.java:134)
> 
> 
> 
> 是UDF的输出结构不对吗,还是需要调整sink table的结构?
> 
 
 


回复: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

2020-12-13 文章 guoliubi...@foxmail.com
TUMBLE第一个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP



guoliubi...@foxmail.com
 
发件人: kandy.wang
发送时间: 2020-12-14 10:28
收件人: user-zh
主题: Window aggregate can only be defined over a time attribute column, but 
TIMESTAMP(3) encountered.
[ERROR] Could not execute SQL statement. 
Reason:org.apache.flink.table.api.TableException: Window aggregate can only be 
defined over a time attribute column, but TIMESTAMP(3) encountered.
 
 
SQL 如下:
create temporary view expose as
 
select  
 
mid
 
,time_local
 
,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as log_ts
 
,proctime
 
from hive.temp.kafka_table
 
;
time_local 是bigint
 
 
 
select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv
 
from expose
 
group by TUMBLE(log_ts, INTERVAL '1' MINUTE);
 
 
window agg的字段报错,如何解决。


Re: Pandas UDF处理过的数据sink问题

2020-12-13 文章 Wei Zhong
Hi Lucas,

是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。

你可以尝试将sql语句改成以下形式:

select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1)
from `some_source`
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount

此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf”

Best,
Wei

> 在 2020年12月13日,13:13,Lucas  写道:
> 
> 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下
> 
> @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
> result_type=DataTypes.ROW(
> [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
>  DataTypes.FIELD('aveBuy', DataTypes.INT())),
> func_type='pandas')
> def orderCalc(code, amount):
> 
>df = pd.DataFrame({'code': code, 'amount': amount})
> # pandas 数据处理后输入另一个dataframe output
> return (output['buyQtl'], output['aveBuy'])
> 
> 
> 定义了csv的sink如下
> 
> create table csvSink (
>buyQtl BIGINT,
>aveBuy INT 
> ) with (
>'connector.type' = 'filesystem',
>'format.type' = 'csv',
>'connector.path' = 'e:/output'
> )
> 
> 
> 
> 然后进行如下的操作:
> 
> result_table = t_env.sql_query("""
> select orderCalc(code, amount)
> from `some_source`
> group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
> """)
> result_table.execute_insert("csvSink")
> 
> 
> 
> 在执行程序的时候提示没法入库
> 
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o98.executeInsert.
> 
> : org.apache.flink.table.api.ValidationException: Column types of query
> result and sink for registered table
> 'default_catalog.default_database.csvSink' do not match.
> 
> Cause: Different number of columns.
> 
> 
> 
> Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
> 
> Sink schema:  [buyQtl: BIGINT, aveBuy: INT]
> 
>at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
> ception(DynamicSinkUtils.java:304)
> 
>at
> org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
> ImplicitCast(DynamicSinkUtils.java:134)
> 
> 
> 
> 是UDF的输出结构不对吗,还是需要调整sink table的结构?
> 



Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

2020-12-13 文章 kandy.wang
[ERROR] Could not execute SQL statement. 
Reason:org.apache.flink.table.api.TableException: Window aggregate can only be 
defined over a time attribute column, but TIMESTAMP(3) encountered.


SQL 如下:
create temporary view expose as

select  

mid

,time_local

,TO_TIMESTAMP(FROM_UNIXTIME(time_local / 1000, '-MM-dd HH:mm:ss')) as log_ts

,proctime

from hive.temp.kafka_table

;
time_local 是bigint



select TUMBLE_START(log_ts, INTERVAL '1' MINUTE) as log_minute,count(1) pv

from expose

group by TUMBLE(log_ts, INTERVAL '1' MINUTE);


window agg的字段报错,如何解决。

Re: flink无法写入数据到ES中

2020-12-13 文章 Evan
你的SQL语句语法有误,请参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html
 

希望能帮助到你!

 
发件人: 小墨鱼
发送时间: 2020-12-11 14:46
收件人: user-zh
主题: flink无法写入数据到ES中
我在使用Flink写入数据到ES中,程序可以执行成功但是ES中没有数据,而且没有任何报错信息我首先创建了一个sink的es表String sql =
"CREATE TABLE es_sink (\n" +"uid INT,\n" +   
"appid INT,\n" +"prepage_id INT,\n" +   
"page_id INT,\n" +"action_id STRING,\n" +   
"page_name STRING,\n" +"action_name STRING,\n" +   
"prepage_name STRING,\n" +"stat_time BIGINT,\n" +   
"dt DATE,\n" +"PRIMARY KEY (uid) NOT ENFORCED\n" +  
 
") WITH (\n" +"'connector.type' = 'elasticsearch',\n" + 
  
"'connector.version' = '6',\n" +"'connector.hosts' =
'http://localhost:9200',\n" +"'connector.index' =
'mytest',\n" +"'connector.document-type' = 'user_action',\n"
+"'update-mode' = 'append',\n" +   
"'connector.key-null-literal' = 'n/a',\n" +   
"'connector.bulk-flush.max-actions' = '1',\n" +   
"'format.type' = 'json'\n" +")";并通过下面查询出数据String sql =
"select 1 as uid,2 as appid,3 as prepage_id,4 as page_id,'5' as
action_id,'6' as page_name,'7' as action_name,'8' as prepage_name,cast(9 as
bigint) as stat_time, cast('2020-11-11' as date) as dt from student limit
1";我的flink版本是1.11.1,es版本是6.2.2有遇到的朋友可以帮助我看一下
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL 怎么为每一个任务分配不同的内存配置

2020-12-13 文章 Kyle Zhang
一个集群跑一个SQL任务怎么样

On Mon, Dec 14, 2020 at 8:42 AM yinghua...@163.com 
wrote:

> Flink 作业在提交时可以通过参数指定JobManager
> 和TaskManager的内存配置,但是SQL执行时怎么为每一个任务指定其内存配置,是不是都是读同一个flink-conf.yaml中的配置?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/config.html#memory-configuration
> 中内存的配置都是基于flink-conf.yaml文件来操作的,是全局的配置,没有找到基于SQL任务独立配合内存的?
>
>
>
> yinghua...@163.com
>


Flink SQL 怎么为每一个任务分配不同的内存配置

2020-12-13 文章 yinghua...@163.com
Flink 作业在提交时可以通过参数指定JobManager 
和TaskManager的内存配置,但是SQL执行时怎么为每一个任务指定其内存配置,是不是都是读同一个flink-conf.yaml中的配置?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/config.html#memory-configuration
 中内存的配置都是基于flink-conf.yaml文件来操作的,是全局的配置,没有找到基于SQL任务独立配合内存的?



yinghua...@163.com