Tumble Window某个时间区间没数据的问题

2020-12-12 文章 guoliubi...@foxmail.com
Flink 1.12.0版本,java代码做了如下处理
txnStream
.window(TumblingEventTimeWindows.of(Time.seconds(3L)))
.process(new NetValueAggregateFunction())
在某个3秒的区间没有数据的话,就不会进入process的function里,
是否有什么配置可以让每3秒必定进process的function吗


guoliubi...@foxmail.com


Pandas UDF处理过的数据sink问题

2020-12-12 文章 Lucas
使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下

@udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
 result_type=DataTypes.ROW(
 [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()),
  DataTypes.FIELD('aveBuy', DataTypes.INT())),
 func_type='pandas')
def orderCalc(code, amount):

df = pd.DataFrame({'code': code, 'amount': amount})
# pandas 数据处理后输入另一个dataframe output
return (output['buyQtl'], output['aveBuy'])
 

定义了csv的sink如下

create table csvSink (
buyQtl BIGINT,
aveBuy INT 
) with (
'connector.type' = 'filesystem',
'format.type' = 'csv',
'connector.path' = 'e:/output'
)

 

然后进行如下的操作:

result_table = t_env.sql_query("""
select orderCalc(code, amount)
from `some_source`
group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount
""")
result_table.execute_insert("csvSink")

 

在执行程序的时候提示没法入库

py4j.protocol.Py4JJavaError: An error occurred while calling
o98.executeInsert.

: org.apache.flink.table.api.ValidationException: Column types of query
result and sink for registered table
'default_catalog.default_database.csvSink' do not match.

Cause: Different number of columns.

 

Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]

Sink schema:  [buyQtl: BIGINT, aveBuy: INT]

at
org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx
ception(DynamicSinkUtils.java:304)

at
org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply
ImplicitCast(DynamicSinkUtils.java:134)

 

是UDF的输出结构不对吗,还是需要调整sink table的结构?



flink-shaded-hadoop-2-uber*-* 版本确定问题

2020-12-12 文章 Jacob
请问在升级flink版本的过程中,需要在flink/lib里面引入该包,但该包的版本号如何确定?
flink-shaded-hadoop-2-uber*-*



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.10.0 on yarn 提交job失败

2020-12-12 文章 Jacob
Hello, 请问在flink 1.10.0 on yarn提交job出现此问题是什么原因,hadoop
jar包依赖吗?该程序在1.10以下的版本均可运行,在1.10.0无法提交。

谢谢!


[jacob@hadoop001 bin]$ ./yarn logs -applicationId
application_1603495749855_57650
20/12/11 18:52:55 INFO client.RMProxy: Connecting to ResourceManager at
localhost:8032
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/app/hadoop_client/e11_backend/hadoop-2.6.0-cdh5.8.3/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/app/hadoop-2.6.0-cdh5.8.3/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
20/12/11 18:52:57 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable


Container: container_1603495749855_57650_02_01 on localhost
=
LogType:jobmanager.err
Log Upload Time:Fri Dec 11 18:49:21 -0800 2020
LogLength:2368
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/jacob/appcache/application_1603495749855_57650/filecache/11/datafeed-website-filter_flink-0.0.1-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/data/hadoop/dn/sde/yarn/nm/usercache/jacob/appcache/application_1603495749855_57650/filecache/17/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[ch.qos.logback.classic.util.ContextSelectorStaticBinder]
Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.hadoop.conf.Configuration.addDeprecations([Lorg/apache/hadoop/conf/Configuration$DeprecationDelta;)V
at
org.apache.hadoop.mapreduce.util.ConfigUtil.addDeprecatedKeys(ConfigUtil.java:54)
at
org.apache.hadoop.mapreduce.util.ConfigUtil.loadResources(ConfigUtil.java:42)
at org.apache.hadoop.mapred.JobConf.(JobConf.java:119)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1659)
at
org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:91)
at
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.hadoop.security.Groups.(Groups.java:55)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:182)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:235)
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:214)
at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:669)
at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:571)
at
org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136)
at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Over window的watermark没有触发计算(附完整代码),谢谢

2020-12-12 文章 Appleyuchi




代码是:
https://paste.ubuntu.com/p/GTgGhhcjyZ/


文档是:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#group-windows
中的
Over Window Aggregation


代码bug free
但是无法输出,求助,谢谢

tEnv.executeSql(query).print() 这样不能成功消费kafka的数据

2020-12-12 文章 jy l
Hi:
我Flink消费kafka的数据,我创建一张表如下:

val kafkaSource =
  """
|create table kafka_order(
|order_id string,
|order_price decimal(10,2),
|order_time timestamp(3)
|)
|with(
|'connector' = 'kafka',
|'topic' = 'iceberg.order',
|'properties.bootstrap.servers' = 'hostname:9092',
|'format' = 'json',
|'properties.group.id' = 'data-lake',
|'scan.startup.mode' = 'earliest-offset',
|'json.ignore-parse-errors' = 'false'
|)
|""".stripMargin
tEnv.executeSql(kafkaSource)

我直接查询后print到控制台时,没法消费成功,如下:
 val query =
  """
|select * from kafka_order
|""".stripMargin
tEnv.executeSql(query).print()

但是我创建一个print的connect,然后insert into 表 select * from kafka_order这样是可以正常消费的,如下:
val print =
  """
|create table p_order(
|order_id string,
|order_price decimal(10,2),
|order_time timestamp(3)
|)
|with(
|'connector' = 'print'
|)
|""".stripMargin
tEnv.executeSql(print)
val query =
  """
|insert into p_order
|select * from kafka_order
|""".stripMargin
tEnv.executeSql(query)

这具体是为什么呢?望知道的大佬告知一下,感激不尽。

祝好!