Tumble Window某个时间区间没数据的问题
Flink 1.12.0版本,java代码做了如下处理 txnStream .window(TumblingEventTimeWindows.of(Time.seconds(3L))) .process(new NetValueAggregateFunction()) 在某个3秒的区间没有数据的话,就不会进入process的function里, 是否有什么配置可以让每3秒必定进process的function吗 guoliubi...@foxmail.com
Pandas UDF处理过的数据sink问题
使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下 @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()], result_type=DataTypes.ROW( [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()), DataTypes.FIELD('aveBuy', DataTypes.INT())), func_type='pandas') def orderCalc(code, amount): df = pd.DataFrame({'code': code, 'amount': amount}) # pandas 数据处理后输入另一个dataframe output return (output['buyQtl'], output['aveBuy']) 定义了csv的sink如下 create table csvSink ( buyQtl BIGINT, aveBuy INT ) with ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = 'e:/output' ) 然后进行如下的操作: result_table = t_env.sql_query(""" select orderCalc(code, amount) from `some_source` group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount """) result_table.execute_insert("csvSink") 在执行程序的时候提示没法入库 py4j.protocol.Py4JJavaError: An error occurred while calling o98.executeInsert. : org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.csvSink' do not match. Cause: Different number of columns. Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >] Sink schema: [buyQtl: BIGINT, aveBuy: INT] at org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx ception(DynamicSinkUtils.java:304) at org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply ImplicitCast(DynamicSinkUtils.java:134) 是UDF的输出结构不对吗,还是需要调整sink table的结构?
flink-shaded-hadoop-2-uber*-* 版本确定问题
请问在升级flink版本的过程中,需要在flink/lib里面引入该包,但该包的版本号如何确定? flink-shaded-hadoop-2-uber*-* -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink 1.10.0 on yarn 提交job失败
Hello, 请问在flink 1.10.0 on yarn提交job出现此问题是什么原因,hadoop jar包依赖吗?该程序在1.10以下的版本均可运行,在1.10.0无法提交。 谢谢! [jacob@hadoop001 bin]$ ./yarn logs -applicationId application_1603495749855_57650 20/12/11 18:52:55 INFO client.RMProxy: Connecting to ResourceManager at localhost:8032 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/app/hadoop_client/e11_backend/hadoop-2.6.0-cdh5.8.3/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/app/hadoop-2.6.0-cdh5.8.3/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 20/12/11 18:52:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Container: container_1603495749855_57650_02_01 on localhost = LogType:jobmanager.err Log Upload Time:Fri Dec 11 18:49:21 -0800 2020 LogLength:2368 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/data/hadoop/dn/sdc/yarn/nm/usercache/jacob/appcache/application_1603495749855_57650/filecache/11/datafeed-website-filter_flink-0.0.1-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/data/hadoop/dn/sde/yarn/nm/usercache/jacob/appcache/application_1603495749855_57650/filecache/17/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addDeprecations([Lorg/apache/hadoop/conf/Configuration$DeprecationDelta;)V at org.apache.hadoop.mapreduce.util.ConfigUtil.addDeprecatedKeys(ConfigUtil.java:54) at org.apache.hadoop.mapreduce.util.ConfigUtil.loadResources(ConfigUtil.java:42) at org.apache.hadoop.mapred.JobConf.(JobConf.java:119) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1659) at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:91) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.security.Groups.(Groups.java:55) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:182) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:235) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:214) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:669) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:571) at org.apache.flink.yarn.entrypoint.YarnEntrypointUtils.logYarnEnvironmentInformation(YarnEntrypointUtils.java:136) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:109) -- Sent from: http://apache-flink.147419.n8.nabble.com/
Over window的watermark没有触发计算(附完整代码),谢谢
代码是: https://paste.ubuntu.com/p/GTgGhhcjyZ/ 文档是: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#group-windows 中的 Over Window Aggregation 代码bug free 但是无法输出,求助,谢谢
tEnv.executeSql(query).print() 这样不能成功消费kafka的数据
Hi: 我Flink消费kafka的数据,我创建一张表如下: val kafkaSource = """ |create table kafka_order( |order_id string, |order_price decimal(10,2), |order_time timestamp(3) |) |with( |'connector' = 'kafka', |'topic' = 'iceberg.order', |'properties.bootstrap.servers' = 'hostname:9092', |'format' = 'json', |'properties.group.id' = 'data-lake', |'scan.startup.mode' = 'earliest-offset', |'json.ignore-parse-errors' = 'false' |) |""".stripMargin tEnv.executeSql(kafkaSource) 我直接查询后print到控制台时,没法消费成功,如下: val query = """ |select * from kafka_order |""".stripMargin tEnv.executeSql(query).print() 但是我创建一个print的connect,然后insert into 表 select * from kafka_order这样是可以正常消费的,如下: val print = """ |create table p_order( |order_id string, |order_price decimal(10,2), |order_time timestamp(3) |) |with( |'connector' = 'print' |) |""".stripMargin tEnv.executeSql(print) val query = """ |insert into p_order |select * from kafka_order |""".stripMargin tEnv.executeSql(query) 这具体是为什么呢?望知道的大佬告知一下,感激不尽。 祝好!