Re: 退订

2022-03-13 文章 Caizhi Weng
Hi!

退订中文邮件列表请发送任意邮件到 user-zh-unsubscr...@flink.apache.org。

如果不想收到邮件列表的邮件,但希望关注列表中的讨论,可以查看
https://lists.apache.org/list.html?user-zh@flink.apache.org。

Ever <439674...@qq.com.invalid> 于2022年3月11日周五 13:50写道:

> 退订


Re: 退订

2022-03-13 文章 Caizhi Weng
Hi!

退订中文邮件列表请发送任意邮件到 user-zh-unsubscr...@flink.apache.org。

如果不想收到邮件列表的邮件,但希望关注列表中的讨论,可以查看
https://lists.apache.org/list.html?user-zh@flink.apache.org。

thealizeejune  于2022年3月13日周日 20:46写道:

> 退订


Re: 求助:Flink Session模式下,多次重跑同一个批处理任务会导致系统内存被耗尽

2022-03-13 文章 Caizhi Weng
Hi!

图片不能显示,建议上传到外部图床,或通过文字的方式贴上配置。

“多次跑同一个任务”大概是几次?用的是什么 Flink 版本?如果有可能的话能否贴一下用户代码?OOM 是 metaspace 的 OOM
吗?还是其他的?

liuyanwei75 提到的 meta space OOM 问题目前确实是存在的,而且目前没有很好的解决方案。

邮箱  于2022年3月12日周六 02:07写道:

> 我之前也有这个现象,就是不停的启停同一个job,Flink的meta space
> 内存会不断增加,我用arts监控内存发现通过Flink的类加载器child load策略加载的类并没有释放
>
> 发自我的iPhone
>
> > 在 2022年3月11日,21:10,yu'an huang  写道:
> >
> > 你好,建议将Task Manager内存dump下来使用一些内存分析工具比如visual vm,分析下是否存在内存泄露
> >
> >> On Fri, 11 Mar 2022 at 9:28 AM, renshang...@deepexi.com <
> >> renshang...@deepexi.com> wrote:
> >>
> >> 各位大佬好:
> >> 是这样的,我在跑一个批处理任务的过程中发现,在Flink
> >> Session模式下,多次重跑同一个批处理任务会导致系统内存被耗尽,最终导致系统oom之后将taskmanager进程杀掉。
> >>
> >> 按道理说一个job跑完,它所占用的资源都应该释放掉吧,以便后续的job继续执行。但是测试结果显示并没有
> >>
> >> 1、环境:
> >> 5节点的flink 1.14.3 standalone集群,宿主机是64G内存,每个节点分配33G内存,22个slot
> >> 5节点的Hadoop 3.3.1 集群,Hive 3.1.2,iceberg 0.13.1
> >> 数据以iceberg表的形式存储在HDFS上
> >> 2、批处理任务:
> >> 将表A的5千万条数据,对其主键进行Hash之后,插入另一个表B中。SQL语句如下:
> >> insert into flinal_temp
> >> select step_id,
> >> param_id,
> >> wafer_id,
> >> chip_id,
> >> product_id,
> >> hive_partition_content,
> >> cast (wafer_start_date as date),
> >> (HASH_CODE(step_id || param_id || chip_id || wafer_id)) % 100 as part_id
> >> from test_5qw;
> >> 3、出现问题:
> >> 多次跑同一个任务(非并行),就可以通过top命令观察到系统可用内存越来越少,到一定程度之后会触发oom杀掉taskmanager进程
> >> 4、DAG图
> >> 4、Flink配置
> >>
> >>
> >> 5、集群节点
> >> 6、其中两台的内存使用情况
> >>
> >> 目前是无作业运行状态,两台内存占有率均达到了90%,Flink任务刚启动时,内存占有率仅为15%左右
> >>
> >> 手动触发GC也没有效果,内存并不会下降
> >> 7、触发oom杀进程之后
> >> 内存恢复正常
> >> 因为OOM系统杀掉74904 Taskmanager进程
> >>
> >> 可以看到因为内存OOM导致TM被杀掉,一台主机64G内存,分给TM 33G,按道理说不应该超过分配的内存才对
> >>
> >> 请大佬们帮忙看一下吧,感谢!!!
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> renshang...@deepexi.com
> >>
>


Re: flink1.13.6用tableApi的方式从kafka写入hive失败,也不报错,但是写入文件或者mysql没有问题

2022-03-13 文章 Caizhi Weng
Hi!

流作业写入 hive 的数据需要在 checkpoint 之后才可见。我看你的代码里没有启用 checkpoint,可以试一下启用
checkpoint。

jdbc sink 是一个写入即可见的 sink,但它只能保证最终一致性。也就是说如果中途作业出现 failover,那么从上一次
checkpoint 到 failover 之间写入 jdbc sink 的数据就是“冗余”的,要被作业重启后的新数据覆盖才能回归到一致性。

filesystem sink 写入的时候应该创建的是一个临时文件,filesystem source 是不会读这个文件的,只有 checkpoint
之后才会把临时文件重命名。

799590...@qq.com.INVALID <799590...@qq.com.invalid> 于2022年3月12日周六 14:51写道:

>
> 软件版本
> flink:1.13.6
> hive:1.1.1
> hadoop:2.6.0-cdh5.16.2
>
> 通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下,
>
> 运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消息,而hive中却没有新数据进表中。
>
>
> 测试过将输出改为mysql或者csv文件都能持续产生新记录,hive表中的数据也能读取出来,或者从hive的一张表将数据插入刚刚创建的表中也是成功的。就是不知道为什么不能将kafka的动态数据动态写入新建的hive表中。
>
> String KRB5_CONF_PATH = "/home/tetris/conf/krb5.ini";
> String keytab = "/home/tetris/conf/company.keytab";
> String principal = "company";
> System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH);
>
> Configuration configuration = new Configuration();
> configuration.set("hadoop.security.authentication", "kerberos");
> configuration.set("keytab.file", keytab);
> configuration.setBoolean("hadoop.security.authorization", true);
> configuration.set("kerberos.principal", principal);
> UserGroupInformation.setConfiguration(configuration);
> UserGroupInformation.loginUserFromKeytab(principal, keytab);
>
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081);
> StreamTableEnvironment flinkTableEnv =
> StreamTableEnvironment.create(env,bsSettings);
>
> HiveCatalog hiveCatalog = new HiveCatalog("myhive", "tetris",
> "/home/tetris/conf", "1.1.1");
> flinkTableEnv.registerCatalog("myhive",hiveCatalog);
> flinkTableEnv.useCatalog("myhive");
> flinkTableEnv.executeSql("DROP TABLE IF EXISTS data_2431_4928").print();
> flinkTableEnv.executeSql("CREATE TABLE data_2431_4928(id STRING,user_id
> STRING,status STRING) WITH (\n" +
> "'connector' = 'kafka',\n" +
> "'topic' = 'person',\n" +
> "'properties.bootstrap.servers' = '121.4.89.228:9092',\n" +
> "'properties.group.id' = 'testGroup',\n" +
> "'scan.startup.mode' = 'latest-offset',\n" +
> "'format' = 'json',\n" +
>
> "'json.fail-on-missing-field'='false','json.ignore-parse-errors'='true'\n" +
> ")").print();
> flinkTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> flinkTableEnv.executeSql("DROP TABLE IF EXISTS output_2431_4930").print();
> flinkTableEnv.executeSql("CREATE TABLE output_2431_4930(id STRING,user_id
> STRING,status STRING)").print();
> flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> TableResult result = flinkTableEnv.executeSql("INSERT INTO
> output_2431_4930 SELECT id, user_id ,`status` FROM data_2431_4928");
> System.out.println(result.getJobClient().get().getJobID());
>
>
>
> 谌祥,杭州 - java后端开发 - 大数据方向
> 799590...@qq.com
>


Re: 使用CUMULATE WINDOW 消费upsertkafka遇到的问题

2022-03-02 文章 Caizhi Weng
Hi!

window tvf 目前不支持消费 changelog,也就是说只能消费 insert 数据。upsert-kafka source 是一个会产生
changelog 的 source,因此下游不能接 window tvf。

赵旭晨  于2022年3月1日周二 15:23写道:

> sql如下:
> with effective_chargeorder as (
> select
> o.recordcreatedtime,o.recordcreateduser,o.status,o._is_delete,o.appointmentid,
> o.id,o.tenantid,o.actualprice,o.proc_time from t_k_chargeorder as o
> where o.recordcreateduser > 0 and o.status NOT IN ( '已作废', '未收费', '作废并撤回',
> '等待支付宝付费', '等待微信付费' )
> and o._is_delete = '0' and o.appointmentid > 0
> )
> --select * from effective_chargeorder;
> SELECT window_start, window_end, SUM(actualprice)
>   FROM TABLE(
> CUMULATE(TABLE effective_chargeorder, DESCRIPTOR(proc_time), INTERVAL
> '2' MINUTES, INTERVAL '10' MINUTES))
>   GROUP BY window_start, window_end;
>
> IDE报如下错误:
> StreamPhysicalWindowAggregate doesn't support consuming update and delete
> changes which is produced by node ChangelogNormalize(key=[id])
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:394)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:310)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:353)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:342)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:341)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
> 消费的是upsertkafka的source,由于对planner这层了解不深,不是很看的懂,请各位大佬指点~
>
>
>
>
>
>
>
>
>
>
>


Re: jdbc connector 写入异常数据后,不再写入新数据时,异常信息不断嵌套,且不会抛出

2022-02-14 文章 Caizhi Weng
Hi!

图片不能显示,建议传到 imgur 等外部图床上,并在邮件里贴出链接。

虽然看不到图片,但看描述应该是一个已知问题 [1],只是目前还没人修复。

[1] https://issues.apache.org/jira/browse/FLINK-24677

jianjianjianjianjianjianjianjian <724125...@qq.com.invalid> 于2022年2月14日周一
15:40写道:

> 老师们,你们好:
>   在使用jdbc connector写入操作时,*写入一条错误数据*(字段值超长)后*不再写入数据*
> ,异常信息打印,但错误信息不会抛出,且异常信息会不断嵌套。该情况可能会导致问题延迟。
>   当前使用为1.13版本 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat
> 类
>   查阅 master 分支的 org.apache.flink.connector.jdbc.internal.
> JdbcOutputFormat  类也存在类似问题,代码截图如下:
>
> *1.13版本:*
>
> master 分支:
>
>
>
>
>


Re: 请教设置了table.exec.state.ttl后,结果数据不更新问题

2022-02-14 文章 Caizhi Weng
Hi!

图片不能显示,建议传到 imgur 等外部图床上,再把链接贴到邮件里。

设置 state ttl 之前 sink 数据能一直更新吗?确认不是因为后来的数据不符合某些 where 条件导致的吗?

liangjinghong  于2022年2月12日周六 14:39写道:

> 你好,我是一个flink新手。为了进行状态管理,我在代码中设置了configuration.setString("table.exec.state.ttl","12h");
>
> 然而,在flink作业运行12小时后,*我的update结果表再也没有更新过*。从web ui可以看到,我的源头与一些算子的Records 
> Sent一直在增长,任务也没有出现任何异常,checkpoint也正常,所以我很苦恼是哪里出现了问题
>
> 以下是我的SQL语句:
>
> CREATE TABLE `tbl_rpt_app_usage`
>
>  (`datepoint` TIMESTAMP,
>
>   `appId` VARCHAR ,
>
>   `flavor` VARCHAR ,
>
>   `applyNum` BIGINT ,
>
>   `releaseNum` BIGINT ,
>
>   `usedNum` BIGINT ,
>
>PRIMARY KEY (`datepoint`) NOT ENFORCED
>
>  )WITH(
>
>  'connector' = 'mysql-cdc',
>
>  'hostname' = '',
>
>  'port' = '3306',
>
>  'username' = '',
>
>  'password' = '',
>
>  'database-name' = '',
>
>  'table-name' = ''
>
>  );
>
> CREATE TABLE `temporary_usage`
>
>  (`datepoint` TIMESTAMP,
>
>   `appId` VARCHAR ,
>
>   `flavor` VARCHAR ,
>
>   `applyNum` BIGINT ,
>
>   `releaseNum` BIGINT ,
>
>   `usedNum` BIGINT ,
>
>PRIMARY KEY (`datepoint`) NOT ENFORCED
>
>  )WITH(
>
>  'connector' = 'mysql-cdc',
>
>  'hostname' = '',
>
>  'port' = '',
>
>  'username' = '',
>
>  'password' = '',
>
>  'database-name' = '',
>
>  'table-name' = ''
>
>  );
>
> CREATE TABLE `tbl_cce_cluster`
>
>  (`clusterId` VARCHAR ,
>
>   `region` VARCHAR ,
>
>   `flavor` VARCHAR ,
>
>   `totalNode` BIGINT ,
>
>   `toolName` VARCHAR ,
>
>   `appId` VARCHAR ,
>
>PRIMARY KEY (`clusterId`) NOT ENFORCED
>
>  )WITH(
>
>  'connector' = 'mysql-cdc',
>
>  'hostname' = '',
>
>  'port' = '3306',
>
>  'username' = '',
>
>  'password' = '',
>
>  'database-name' = '',
>
>  'table-name' = ''
>
>  );
>
> ---mysql落地表
>
> CREATE table sink(
>
>   code STRING,
>
>   name STRING,
>
>   usedcnt BIGINT,
>
>   `time` TIMESTAMP,
>
>   type STRING,
>
>   PRIMARY KEY (`time`) NOT ENFORCED
>
> ) with (
>
>'connector' = 'jdbc',
>
>'url' = 'jdbc:mysql: ',
>
>'username' = '',
>
>'password' = '',
>
>'table-name' = 'sink'
>
> );
>
> (因为MySQLCDC似乎不支持在同步时筛选指定内容,所以目前同步后先创建筛选结果的虚拟表,再进行计算,还想请教下是否有更好的办法)
>
> CREATE VIEW rpt
>
>  (datepoint,appId,flavor,applyNum,releaseNum,usedNum)
>
>  AS
>
>  select * from tbl_rpt_app_usage
>
>  where datepoint > '2022-02-11 14:54:01'
>
>  and appId not in ('aaa')
>
>  union all
>
>  select * from temporary_usage
>
>
>
> CREATE VIEW cce
>
> (clusterId,region,flavor,totalNode,toolName,appId)
>
> AS
>
> select * from tbl_cce_cluster
>
> where toolName in ('bbb','ccc')
>
>
>
> ---代码逻辑如下:
>
> insert into sink
>
> select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from(
>
> select
>
> info.code,
>
> info.name,
>
> sum(rpt.usedNum) as usedcnt,'online' type
>
> from (
>
> select * from (
>
> select *,ROW_NUMBER() OVER(PARTITION by appId,flavor order by datepoint
> desc) as row_num from rpt/*+ OPTIONS('server-id'='1001-1005') */
>
> )where row_num =1
>
> )rpt
>
> join info
>
> on info.appid=rpt.appId
>
> group by info.code,info.name)
>
> union all
>
> select code,name,usedcnt,LOCALTIMESTAMP as `time`,type from(
>
> select
>
> info.code,
>
> info.name,
>
> sum(totalNode) as usedcnt,'online' type
>
> from
>
> cce/*+ OPTIONS('server-id'='1006-1010') */
>
> join info
>
> on cce.appId=info.appid
>
> group by info.code,info.name)
>
> 我的结果表在的12小时(我设置的过期时间)后,再也没更新过。
>
>
>
> 另外,还想请教一个问题:目前group by算子的落地到数据库的结果只支持update
> 吗?业务期待获取历史数据,目前只能想到每分钟全量同步一次结果表的数据到另一个表里,这样就可以追踪到历史数据,是否还有更好的解决办法呢?
>
>
>
>
>
> 非常感谢您的阅读与解惑!
>


Re: flink是否支持 http请求并返回json数据

2022-02-09 文章 Caizhi Weng
Hi!

Flink 目前没有 http server source / sink。这是一个 OLAP
的需求吗?从描述的需求来看,一种更加合理的方式应该是有一个专门的 http server 接受请求,调用 Flink API 运行一个 Flink
作业(Flink SQL 可以运行 select 语句),再将结果返回给调用方。

张锴  于2022年2月9日周三 14:28写道:

>
> 业务需求:通过http请求方式将参数传给flink,将参数带入flink程序再把结果以json的形式返回。请问一下以这种方式实时计算,flink是否支持?
>
> flink版本:1.12.1
>


Re: 请教三张以上时态表关联,加where条件后报错 mismatched type $6 TIMESTAMP(3)的问题

2022-02-06 文章 Caizhi Weng
Hi!

你想要将主表和两张维表进行连续的 event time temporal join 吗?

第一个 SQL 语句是不是不完整,只有一个 join on 却有两张表。

第二个 SQL 语句中,首先定义 view 应该通过 create view 语句,其次 event time temporal join
应该使用左表的 event time。你的语句中,第一个 join 使用的是 FOR SYSTEM_TIME AS OF
job.lastUpdateTime 这是正确的,但第二个 join 使用的是 FOR SYSTEM_TIME AS OF
t.lastModifiedTime 这是错误的。因为 t.lastModifiedTime 来自维表 t,经过第一次 event time
temporal join,这一列的 rowtime 属性已经被去掉了,只留下了 job.lastUpdateTime 的 rowtime
属性,因此第二个 join 应该继续使用 FOR SYSTEM_TIME AS OF job.lastUpdateTime。也就是说,完整的 SQL
语句应该是:

create view job as
select * from tbl_schedule_job job where job.jobStatus in ('RUNNING',
'INITING','ERROR');

insert into sink
select count(1) as machine
from (
select * from job
join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t
on t.jobId = job.jobId
join tbl_broker_node FOR SYSTEM_TIME AS OF job.lastUpdateTime AS n
on t.nodeId = n.id);

liangjinghong  于2022年2月5日周六 17:26写道:

> 你好,因业务需求,flink 1.13,MySQL CDC
> 2.1.1下需要将三张满足时态表结构的表关联,在没有对关联结果加where条件时,可以正常运行,加了where条件后,报错如下:
> SQL:
> insert into sink
> select count(1) as machine from tbl_schedule_job as job
> join tbl_schedule_task FOR SYSTEM_TIME AS OF job.lastUpdateTime as t
> on t.jobId = job.jobId FOR SYSTEM_TIME AS OF t.lastModifiedTime AS n
> where job.jobStatus in ('RUNNING','INITING','ERROR')
> 报错:
> Exception in thread "main" java.lang.AssertionError: mismatched type $6
> TIMESTAMP(3)
>  at
> org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2710)
>  at
> org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2688)
>  at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
>  at
> org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
>  at
> org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
>  at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33)
>  at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>  at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:268)
>  at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:238)
>  at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:256)
>  at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1811)
>  at
> org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:189)
>  at
> org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:377)
>  at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>  at
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>  at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>  at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
>  at
> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
>  at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>  at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>  at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>  at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>  at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>  at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.immutable.Range.foreach(Range.scala:160)
>  at
> scala.collection.TraversableOnce$class

Re: Pyflink 1.14 维表 DDL报错

2022-02-06 文章 Caizhi Weng
Hi!

你看的是阿里云 Blink 的文档。Blink 是阿里原来基于 Flink 1.5 开发的产品,所以一些语法与现在的 Flink
有些不同。period for system_time 是以前 blink 定义维表的语法,现在不需要写了。另外 datagen
确实不能作为维表的数据来源,可以尝试用文件作为维表来源,比如 csv 文件。

目前 Blink 的大部分代码已贡献回 Flink(Flink 1.14 默认 planner 就是原来的 Blink planner),建议直接看
Flink 官网的文档学习[1]。

如果想要了解商业化的内容,应该看阿里云 Flink 全托管的文档[2]。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/joins/#temporal-joins
[2] https://help.aliyun.com/document_detail/169590.html

段泽炳  于2022年2月7日周一 10:17写道:

> 大家好:我是一个flink初学者,刚学到维表join的部分,在维表join的部分遇到了困难。我在阿里云(
> https://help.aliyun.com/document_detail/62506.html)flink sql上找到维表 ddl如下:
> create table phoneNumber( name VARCHAR, phoneNumber bigint, primary
> key(name), PERIOD FOR SYSTEM_TIME )with( type='rds' );
>
>
> 而我自己写的如下:
> create table source_1(
> id int,
> data TINYINT,
> primary key (id),
> PERIOD FOR SYSTEM_TIME
> )with(
> 'connector' = 'datagen',
> 'fields.id.kind'='sequence',
> 'fields.id.start'='5',
> 'fields.id.end'='8',
> 'fields.data.kind'='sequence',
> 'fields.data.start'='4',
> 'fields.data.end'='11'
> )结果在执行的时候报错:
> py4j.protocol.Py4JJavaError: An error occurred while calling o1.executeSql.
> : org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Incorrect syntax near the keyword 'PERIOD' at line 6, column 9.
> Was expecting one of:
>     "CONSTRAINT" ...
>     "PRIMARY" ...
>
> ……
>
>
> 而这个疑惑在我查看flink官网的时候变的更大,因为官网的例子是这样的:
> CREATE TABLE currency_rates ( currency STRING, conversion_rate
> DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM
> `values.source.timestamp` VIRTUAL, WATERMARK FOR update_time AS
> update_time, PRIMARY KEY(currency) NOT ENFORCED ) WITH (
>  'connector' = 'kafka', 'value.format' = 'debezium-json',/* ... */
> );基于以上,我的疑惑共有几点:1.是维表不能使用‘datagen’数据来源吗?还是pyflink 1.14
> 目前不支持维表ddl?2.阿里云的维表ddl语句和官网的维表ddl语句有什么区别吗?他们的使用场景是什么?3.如何能够让我在练习的时候正确使用维表join请大家指点一下,感激不尽


Re: flink任务提交到集群执行一段时间报错Java heap space

2022-01-24 文章 Caizhi Weng
Hi!

你的 state backend 是 heap state backend 吗?如果是的话,Flink
流作业运行过程中的状态会存储在堆中,checkpoint 也会存储在堆中,确实有可能导致 OOM。可以尝试换成其他 state backend 看一下。

Liu Join  于2022年1月21日周五 13:20写道:

>
> 我已经将5s的时间窗口替换为100条的countWindowAll,具体实现为使用aggregate函数将窗口内的数据拼接为一条sql语句,sql语句如下:replace
> into table (a1,a2,a3,a4,..) values(…)
> 但还是没有解决,
> heap dump暂时无法提供,
> taskmanager内存分配如下:
> task heap:2.76G,network:343MB,JVMMetaspace:256MB
>
>
> 我一共运行了两个任务,都会出现这种问题,但之前写过一个简单的数据同步的程序没有出错,就是将一个MySQL库中的500张表同步到另一个MySQL库,不知道对于这种问题有没有解决的方向。
>
> 之前在监控任务运行时发现是MySQLsource先失败,然后导致整个任务挂了,在开启checkpoint时,MySQLsource和开窗之前的部分为一个parallelism,这个parallelism的checkpoint大小一直是136MB,从任务开始到结束都是136MB,其他运算的checkpoint不到1MB,是否有这部分原因
> 从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送
>
> 发件人: Caizhi Weng<mailto:tsreape...@gmail.com>
> 发送时间: 2022年1月21日 10:52
> 收件人: flink中文邮件组<mailto:user-zh@flink.apache.org>
> 主题: Re: flink任务提交到集群执行一段时间报错Java heap space
>
> Hi!
>
> 5s 的窗口拼接 sql 语句看起来比较可疑,具体是怎么实现的?另外可以把 task manager 的 heap dump
> 出来看一下哪里占比较多的堆内存。
>
> Liu Join  于2022年1月20日周四 13:28写道:
>
> > 环境:
> >
> >
> flink1.13.5,Standalone模式集群,jobmanager内存2GB,taskmanager内存4GB,集群包括一个jobmanager和两个taskmanager,每个taskmanager有2个slot。
> >
> >
> >
> 任务内容是读取2万张表的数据,数据每1分钟一条,每10分钟输出每张表的最后一条数据。代码中使用了map、filter、watermark、开了一个10分钟的滑动窗口,使用reduce获得最后一条数据,因为sink是mysql,配置不高,所以将最后一条数据拼成批量插入语句才往MySQL写入。开了一个5s的窗口用于拼接sql语句。
> >
> > 报错内容:
> > java.lang.OutOfMemoryError: Java heap space
> >
> > 报错表象:
> >
> >
> 整个taskmanager内存被占满,任务失败重启后taskmanager内存仍然是满的,导致任务再次失败。之后任务直接挂了。时间长了之后内存没释放,Taskmanager进程也会挂了。
> > 从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送
> >
> >
>
>


Re: flink执行任务失败,taskmanager内存不释放

2022-01-24 文章 Caizhi Weng
Hi!

taskmanager 内存就一直上涨指的是堆内存吗?可以把 heap dump 出来看一下具体是哪里占用了内存。

Liu Join  于2022年1月21日周五 15:21写道:

> 环境:flink1.13.5,Standalone模式,taskmanager内存4GB,两个slot
>
>
> 任务数据量很少,不到10MB,任务执行时,taskmanager内存就一直上涨知道报错重启,但因为任务重启后taskmanager内存没有释放,导致任务彻底失败,taskmanager服务也挂了
>
> 从 Windows 版邮件发送
>
>


Re: flink 1.13.2 计算hive仓库数据时错误,NullPointerException

2022-01-24 文章 Caizhi Weng
Hi!

这看起来像是一个 bug,能否提供一下 hive 表的 DDL 还有运行的 query 语句,这样大家可以更好地调查这个问题?

Asahi Lee  于2022年1月24日周一 09:53写道:

> 2022-01-23 04:31:39,568 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph     
>  [] - Source:
> HiveSource-cosldatacenter.ods_rimdrill_dailyincidentsevents ->
> Calc(select=[jobid, reportno, dayssincelast], where=[(idno = 1:BIGINT)])
> (1/1) (7533d77baa7eb16e8242ae63e0706dff) switched from RUNNING to CANCELING.
> 2022-01-23 04:31:39,570 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph     
>  [] - Discarding the results produced by task execution
> 07b2cd514c6b6d85f79ab5b953971f82.
> 2022-01-23 04:31:39,570 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph     
>  [] - MultipleInput(readOrder=[0,0,1],
> members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND
> ($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, dayssincelast, $f109, jobid0,
> reportno0, boptestdate, bopfunctiontestdate], build=[right])\n:-
> Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1,
> numperstype2, numperstype3, numperstype4, numperstype5, numperstype6,
> currentops, futureops, dayssincelast, bigint(reportno) AS $f109])\n: 
> +- HashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f94 =
> reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, $f94, jobid0, reportno0,
> dayssincelast], build=[right])\n:     :- Calc(select=[jobid,
> reportno, reportdate, depthprogress, numperstype1, numperstype2,
> numperstype3, numperstype4, numperstype5, numperstype6, currentops,
> futureops, bigint(reportno) AS $f94])\n:     :  +- [#3]
> Exchange(distribution=[hash[jobid]])\n:     +- [#2]
> Exchange(distribution=[hash[jobid]])\n+- [#1]
> Exchange(distribution=[hash[jobid]])\n]) -> Calc(select=[jobid AS $f0,
> reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS
> $f4, futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2
> AS $f8, numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11,
> numperstype6 AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14])
> -> HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5,
> $f6, $f7, $f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4,
> $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate,
> MAX($f14) AS lasbopfunctiontestdate]) -> Calc(select=[$f0 AS jobid, $f1
> AS reportno, string($f6) AS reportdate, bigint((nvl($f7, 0) + nvl($f8,
> 0)) + nvl($f9, 0)) + nvl($f10, 0)) + nvl($f11, 0)) + nvl($f12, 0))) AS
> pobcnt, $f2 AS dayssincelast, $f3 AS depthprogress, $f4 AS currentops, $f5
> AS futureops, lasboptestdate, lasbopfunctiontestdate]) -> Map ->
> Sink: Unnamed (1/1) (3c555cbd6bf411a6111cf7eaab527d33) switched from
> CREATED to CANCELING.
> 2022-01-23 04:31:39,570 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph     
>  [] - MultipleInput(readOrder=[0,0,1],
> members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND
> ($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, dayssincelast, $f109, jobid0,
> reportno0, boptestdate, bopfunctiontestdate], build=[right])\n:-
> Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1,
> numperstype2, numperstype3, numperstype4, numperstype5, numperstype6,
> currentops, futureops, dayssincelast, bigint(reportno) AS $f109])\n: 
> +- HashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f94 =
> reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, $f94, jobid0, reportno0,
> dayssincelast], build=[right])\n:     :- Calc(select=[jobid,
> reportno, reportdate, depthprogress, numperstype1, numperstype2,
> numperstype3, numperstype4, numperstype5, numperstype6, currentops,
> futureops, bigint(reportno) AS $f94])\n:     :  +- [#3]
> Exchange(distribution=[hash[jobid]])\n:     +- [#2]
> Exchange(distribution=[hash[jobid]])\n+- [#1]
> Exchange(distribution=[hash[jobid]])\n]) -> Calc(select=[jobid AS $f0,
> reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS
> $f4, futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2
> AS $f8, numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11,
> numperstype6 AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14])
> -> HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5,
> $f6, $f7, $f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4,
> $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate,
> 

Re: flink任务提交到集群执行一段时间报错Java heap space

2022-01-20 文章 Caizhi Weng
Hi!

5s 的窗口拼接 sql 语句看起来比较可疑,具体是怎么实现的?另外可以把 task manager 的 heap dump
出来看一下哪里占比较多的堆内存。

Liu Join  于2022年1月20日周四 13:28写道:

> 环境:
>
> flink1.13.5,Standalone模式集群,jobmanager内存2GB,taskmanager内存4GB,集群包括一个jobmanager和两个taskmanager,每个taskmanager有2个slot。
>
>
> 任务内容是读取2万张表的数据,数据每1分钟一条,每10分钟输出每张表的最后一条数据。代码中使用了map、filter、watermark、开了一个10分钟的滑动窗口,使用reduce获得最后一条数据,因为sink是mysql,配置不高,所以将最后一条数据拼成批量插入语句才往MySQL写入。开了一个5s的窗口用于拼接sql语句。
>
> 报错内容:
> java.lang.OutOfMemoryError: Java heap space
>
> 报错表象:
>
> 整个taskmanager内存被占满,任务失败重启后taskmanager内存仍然是满的,导致任务再次失败。之后任务直接挂了。时间长了之后内存没释放,Taskmanager进程也会挂了。
> 从 Windows 版邮件发送
>
>


Re: 如何确定分配内存的大小

2022-01-10 文章 Caizhi Weng
Hi!

这个和你的作业特性强相关。目前 task manager 是按比例给各个算子分配 managed
memory,所以只要内存不太小作业都能勉强跑,如果遇到内存问题再调大对应部分即可。

https://zhuanlan.zhihu.com/p/340345588 可以作为参考。

许友昌 <18243083...@163.com> 于2022年1月10日周一 15:18写道:

> 请问在启动flink 任务时,要如何确定该分配多少内存给 jobmanager,分配多少给 taskmanager,当我们指定 -ytm 1024
> 或 -ytm 2048 的依据是什么?


Re: 批模式疑问

2022-01-10 文章 Caizhi Weng
Hi!

env.setRuntimeMode(RuntimeExecutionMode.BATCH); 要放在创建 table environment
之前,否则创建出来的 table environment 还是 streaming 模式。另外这个需要 Flink >= 1.14。



陈卓宇 <2572805...@qq.com.invalid> 于2022年1月10日周一 15:03写道:

> 代码逻辑简单描述:  我通过fromElements的方式简单构造了几条测试数据,然后将流转表,在表上
> 使用我自定义的聚合函数,进行聚合操作,最后打印
> 我的代码:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);  //设置调度模式为批
>
> DataStreamSource env.fromElements(Tuple2.of("aa", 1),
> Tuple2.of("aa", 2),Tuple2.of("aa", 3),Tuple2.of("bb",
> 2),Tuple2.of("bb", 3),Tuple2.of("bb", 4));
> Table table = tenv.fromDataStream(source,
> Schema.newBuilder()
> .column("f0", "STRING")
> .column("f1", "INTEGER")
> .build());
> tenv.createTemporaryView("test",table);
> //对表进行sql查询
> tenv.createTemporarySystemFunction("Average", avg5.Average.class);
> tenv.executeSql("SELECT f0,Average(f1) as rbm FROM test group by
> f0").print();
>
> 结果:
> ++++
> | op | f0 |rbm |
> ++++
> | +I | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U | aa | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +I | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | -U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> | +U | bb | [0, 0, 0, 0, 1, 0, 0, 0, 0,... |
> ++++
> 我发现结果是一个可撤回流,这与我所预想批处理是不一致的
>
> 请问:我设置的batch,为什么调度模式还没有改变,我该如何解决这个问题,让他成为批处理?
>
> 最后:感谢之前几个问题上,社区同学的无私解答,因为自身邮箱出了一些问题,就不一一致谢了,望见谅。
>
> 陈卓宇
>
>
>  


Re: flink sql 如何提高下游并发度?

2022-01-10 文章 Caizhi Weng
Hi!

可以设置 parallelism.default 为需要的并发数。

Jeff  于2022年1月9日周日 19:44写道:

> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?


Re: flink mysql cdc 同步数据报错

2022-01-05 文章 Caizhi Weng
Hi!

根本原因是 Caused by: java.io.StreamCorruptedException: unexpected block
data,也就是说集群上这个 class 的版本和客户端这个 class 的版本不一致。建议检查集群和客户端的 flink 版本以及 cdc
connector 版本是否一致。

Fei Han  于2022年1月5日周三 19:12写道:

> @all:
>   大家好,在Flink mysql cdc中同步数据时,对接mysql无法同步
> 版本:
> flink1.13.3
> flink mysql cdc 2.1.1
>
> 如下报错是什么原因造成的,请大佬们看下
>
>  ERROR org.apache.flink.runtime.rest.RestClient 491 parseResponse -
> Received response was neither of the expected type ([simple type, class
> org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody])
> nor an error.
> Response=JsonResponse{json={"status":{"id":"COMPLETED"},"job-execution-result":{"id":"61f45c66580d385727fd97520b66a79a","application-status":"FAILED","accumulator-results":{},"net-runtime":55,"failure-cause":{"class":"org.apache.flink.runtime.client.JobInitializationException","stack-trace":"org.apache.flink.runtime.client.JobInitializationException:
> Could not start the JobMaster.\n\tat
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)\n\tat
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)\n\tat
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)\n\tat
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)\n\tat
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
> java.lang.Thread.run(Thread.java:748)\nCaused by:
> java.util.concurrent.CompletionException: java.lang.RuntimeException:
> org.apache.flink.runtime.JobException: Cannot instantiate the coordinator
> for operator Source: TableSourceScan(table=[[qhc_ods_catalog, qhc_hms,
> order_consume_buy_order_src]], fields=[id, consume_order_id,
> consume_order_code, sku_id, sku_name, is_card_goods, buy_order_id,
> buy_order_code, buy_order_type, consume_order_time, gmt_create,
> gmt_modified, database_name, op_ts, table_name]) -> Calc(select=[id,
> consume_order_id, consume_order_code, sku_id, sku_name, is_card_goods,
> buy_order_id, buy_order_code, buy_order_type, consume_order_time,
> gmt_create, gmt_modified]) -> NotNullEnforcer(fields=[id,
> consume_order_id])\n\tat
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)\n\t...
> 7 more\nCaused by: java.lang.RuntimeException:
> org.apache.flink.runtime.JobException: Cannot instantiate the coordinator
> for operator Source: TableSourceScan(table=[[qhc_ods_catalog, qhc_hms,
> order_consume_buy_order_src]], fields=[id, consume_order_id,
> consume_order_code, sku_id, sku_name, is_card_goods, buy_order_id,
> buy_order_code, buy_order_type, consume_order_time, gmt_create,
> gmt_modified, database_name, op_ts, table_name]) -> Calc(select=[id,
> consume_order_id, consume_order_code, sku_id, sku_name, is_card_goods,
> buy_order_id, buy_order_code, buy_order_type, consume_order_time,
> gmt_create, gmt_modified]) -> NotNullEnforcer(fields=[id,
> consume_order_id])\n\tat
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)\n\tat
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)\n\t...
> 7 more\nCaused by: org.apache.flink.runtime.JobException: Cannot
> instantiate the coordinator for operator Source:
> TableSourceScan(table=[[qhc_ods_catalog, qhc_hms,
> order_consume_buy_order_src]], fields=[id, consume_order_id,
> consume_order_code, sku_id, sku_name, is_card_goods, buy_order_id,
> buy_order_code, buy_order_type, consume_order_time, gmt_create,
> gmt_modified, database_name, op_ts, table_name]) -> Calc(select=[id,
> consume_order_id, consume_order_code, sku_id, sku_name, is_card_goods,
> buy_order_id, buy_order_code, buy_order_type, consume_order_time,
> gmt_create, gmt_modified]) -> NotNullEnforcer(fields=[id,
> consume_order_id])\n\tat
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:217)\n\tat
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:792)\n\tat
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultE

Re: Re: Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-30 文章 Caizhi Weng
Hi!

感谢说明。目前 elastic search sink 确实没有这样的功能。一种方式是如其他回复所说对 SQL
进行判断,不过我觉得更好的方式是写一个自定义的 es sink。es sink 的逻辑详见 ElasticsearchDynamicSink
类,可能只需要实现一个自定义的 RowElasticsearchEmitter 就可以了。

RS  于2021年12月31日周五 10:29写道:

> Hi,
> 你好,是这样的,从kafka消费的话,如果表定义了 a,b,c三个字段,如果kafka的数据少了一个a,那么在flink sql里面,读出来的就是
> a=null,写入ES的话,就会有个a=null
>
>
> 比如从ES查询数据的话
> 期望 没有a的时候,查询结果类似 {b=1,c=2}
> 如果写了a=null进去,查询结果类似 {a=null,b=1,c=2}
> 这样结果就和期望的不一样了,所以期望是Flink SQL insert的时候 ,不写数值为null字段
>
> 在 2021-12-31 10:15:41,"Caizhi Weng"  写道:
> >Hi!
> >
> >我不太熟悉 es,如果某一个字段不写的话,是会写入一个默认值吗?如果是的话,可以使用 coalesce 函数。coalesce(a, b, c,
> >...) 会返回第一个非 null 的值,因此只要把默认值放在最后一个,如果前面都是 null 就会写默认值。
> >
> >RS  于2021年12月30日周四 17:06写道:
> >
> >> 有10~20个字段,这样一个个写,手都敲断了,还有其他的方式吗?或者如何开发代码适配到SQL?
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-12-30 11:36:21,"Xuyang"  写道:
> >> >可以使用case when试一下
> >> >在 2021-12-29 16:40:39,"RS"  写道:
> >> >>Hi,
> >> >>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
> >> >>
> >> >>
> >> >>比如:源数据有3个字段,a,b,c
> >> >>insert into table2
> >> >>select
> >> >>a,b,c
> >> >>from table1
> >> >>当b=null的时候,只希望写入a和c
> >> >>当c=null的时候,只希望写入a和b
> >> >>
> >>
>


Re: Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-30 文章 Caizhi Weng
Hi!

我不太熟悉 es,如果某一个字段不写的话,是会写入一个默认值吗?如果是的话,可以使用 coalesce 函数。coalesce(a, b, c,
...) 会返回第一个非 null 的值,因此只要把默认值放在最后一个,如果前面都是 null 就会写默认值。

RS  于2021年12月30日周四 17:06写道:

> 有10~20个字段,这样一个个写,手都敲断了,还有其他的方式吗?或者如何开发代码适配到SQL?
>
>
>
>
>
> 在 2021-12-30 11:36:21,"Xuyang"  写道:
> >可以使用case when试一下
> >在 2021-12-29 16:40:39,"RS"  写道:
> >>Hi,
> >>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
> >>
> >>
> >>比如:源数据有3个字段,a,b,c
> >>insert into table2
> >>select
> >>a,b,c
> >>from table1
> >>当b=null的时候,只希望写入a和c
> >>当c=null的时候,只希望写入a和b
> >>
>


Re: flink 无法checkpoint问题

2021-12-29 文章 Caizhi Weng
Hi!

图片无法显示,建议使用外部图床上传。

checkpoint 慢的原因可能有很多,最可能的原因是由于算子处理数据太慢导致反压(可以通过 Flink web UI 每个节点的 busy
百分比大致看出来)。建议检查资源是否充足,数据是否倾斜,gc 是否过于频繁等。

紫月幽魔灵  于2021年12月28日周二 10:38写道:

> 版本:flink版本1.14.0
> 问题: 使用flink 1.14.0版本提交到jdk1.7版本的yarn集群上checkpoint无法生成,一直处于IN_PROGRESS状态
> 提交命令如下:
> ./bin/flinksql-submit.sh \
> --sql sqlserver-cdc-to-kafka.sql \
> -m yarn-cluster \
> -ynm sqlserverTOkafka \
> -ys 2 \
> -yjm 1024 \
> -ytm 1024 \
> -yid application_1640657115196_0001 \
> -yD yarn.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD containerized.master.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD containerized.taskmanager.env.JAVA_HOME=/usr/java/jdk1.8.0_25 \
> -yD log4j2.formatMsgNoLookups=true
> 这是什么原因造成的呢?
>


Re: 关于flink 1.13 TableEnvironment 和StreamTableEnvironment

2021-12-29 文章 Caizhi Weng
Hi!

Flink1.13中TableEnvironment是否支持UDF和UDTF呢?
>

支持。可以通过 tEnv.executeSql("create temporary function myUdf as
'com.my.package.name.MyUdfClass'") 来注册。

StreamTableEnvironment
> 目前只支持streaming模式?支持UDF和UDTF?能否在StreamTableEnvironment中以流的方式写批数据,批数据跑完进程就不存在了吧?
>

StreamTableEnvironment 目前只支持 streaming,同样支持 udf 和 udtf。可以在里面写批数据(称为 bounded
stream)。作业结束后进程是否存在和执行模式有关。如果是 session 模式,那么作业跑完了,job manager 和 task
manager 的进程都还在;如果是 per job 模式,那么进程都会结束。

现在不是特别清楚真正的流批一体化体现在那个入口TableEnvironment 还是StreamTableEnvironment?
>

从 SQL 的角度来看,流批一体化体现在同一套 SQL 语句既可以跑流作业,也可以跑批作业,用户无需为了流批作业写两套 SQL,和具体哪个
environment 没关系。当然用流作业跑 bounded stream 也是流批一体的体现之一,不过如果已知是 bounded
stream,跑批作业可以获得更高的效率。

Flink SQL的分区表是建立在TableEnvironment 还是StreamTableEnvironment?


Flink 目前不自己存储数据,而是用于计算外部存储系统的数据。因此分区表是建立在外部存储系统里的(需要对应外部系统支持,例如 hive),与
Flink 中的哪个 environment 无关。


Fei Han  于2021年12月28日周二 19:13写道:

>
> @all
> 大家好:
> 关于Flink1.13中,TableEnvironment 和StreamTableEnvironment有一些疑惑:
> 1.TableEnvironment支持streaming和batch
> 模式,Flink1.13中TableEnvironment是否支持UDF和UDTF呢?
> 2.StreamTableEnvironment
> 目前只支持streaming模式?支持UDF和UDTF?能否在StreamTableEnvironment中以流的方式写批数据,批数据跑完进程就不存在了吧?
> 3.现在不是特别清楚真正的流批一体化体现在那个入口TableEnvironment 还是StreamTableEnvironment?
> 4.Flink SQL的分区表是建立在TableEnvironment 还是StreamTableEnvironment?


Re: Flink集成Hive问题

2021-12-29 文章 Caizhi Weng
Hi!

看起来你本机的 8081 端口已经有别的程序占用了。能否在浏览器访问 localhost:8081,确认一下打开的是 Flink web UI 吗?

如果跑的是 Flink standalone cluster,需要先启动 standalone cluster。进入 flink 目录,运行
bin/start-cluster.sh 即可启动。

wangbi...@longi.com  于2021年12月29日周三 09:48写道:

> Hi,Flink您好
>我在集成Hive时,通过sql-client连接Hive成功,但在查询hive表时,总是提示以下错误,请问我该怎么办?
>
>  我的环境是CDH6.3.1,Flink1.13.2和1.14.2均尝试,按照官方指导一步步配置,均报相同错误。您的指导对我至关重要,期待您的回复,谢谢
> 2021-12-28 20:33:42,999 WARN
> org.apache.flink.client.program.rest.RestClusterClient   [] - Attempt
> to submit job 'collect' (3ef21e6c1235316899b83d491a9c363a) to '
> http://localhost:8081' has failed.
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.util.RestClientException: Response was not
> valid JSON, but plain-text: Login error. Need username and password
> at
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> [?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> [?:1.8.0_221]
> at
> org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:680)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:613)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:383)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:311)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
> [flink-dist_2.11-1.14.2.jar:1.14.2]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandler

Re: flink 1.15 编译 dump files (if any exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream.

2021-12-29 文章 Caizhi Weng
Hi!

这是说单元测试有失败的 case,可以往上翻一翻找到具体是哪个 case 报给社区。

不过如果单纯只是为了编译,可以在 mvn 命令后添加 -DskipTests
选项来跳过测试。所有模块的测试总时长很长的(小时级别),如果只是编译的话只要十几分钟就编译好了。

Michael Ran  于2021年12月29日周三 14:16写道:

> dear all :
> 有朋友遇到过编译flink 1.15 master  出现这个异常吗?
>
>
>
>
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test)
> on project flink-runtime: There are test failures.
> [ERROR]
> [ERROR] Please refer to
> /Users/qqr/work/git/fork/flink/flink-runtime/target/surefire-reports for
> the individual test results.
> [ERROR] Please refer to dump files (if any exist) [date].dump,
> [date]-jvmRun[N].dump and [date].dumpstream.
> [ERROR] -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the
> -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> [ERROR]
> [ERROR] After correcting the problems, you can resume the build with the
> command
> [ERROR]   mvn  -rf :flink-runtime
>
>


Re: flinksql钩子函数

2021-12-26 文章 Caizhi Weng
Hi!

ExecutionEnvironment 与 StreamExecutionEnvironment 均有 registerJobListener 方法
[1][2],可以传进一个 JobListener
[3],在作业提交以及完成的时候调用对应方法。当然,这需要你提交作业的客户端程序一直存在,直到作业完成并且对应函数被调用。

[1]
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/ExecutionEnvironment.html#registerJobListener-org.apache.flink.core.execution.JobListener-
[2]
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#registerJobListener-org.apache.flink.core.execution.JobListener-
[3]
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html

陈卓宇 <2572805...@qq.com.invalid> 于2021年12月25日周六 23:30写道:

> 您好社区:
>     场景是这样的:
>
> 我司要求将标签数据每日同步一份到mongodb供业务开发同学进行使用,面临这样一个问题,我是不能先删表在建表的,这样会导致接口查询mongodb出现数据查询不到的风险。而是使用切表,将同步表设为:表名_时间戳,将历史表删除,在将同步表改为正确表名,需要flinksql执行完成后触发钩子函数完成切表操作。请问flink有这方面的支持么
>
>
>
> 陈卓
>
>
>  


Re: Flink SQL DECIMAL精度问题

2021-12-23 文章 Caizhi Weng
Hi!

当时应该是参考了其他系统(比如 sql server)的精度范围,SQL 标准里好像确实没有提到。

目前如果需要更高精度,可以考虑先用 string 存储,并通过 udf 把 string 转成 java big decimal,运算完之后再变回
string。

Michael Ran  于2021年12月24日周五 10:59写道:

> clickhouse  还提供 Decimal64 Decimal128 ,我也想知道38 这个是什么数据库的标准吗?
> 在 2021-12-23 19:58:24,"Ada Wong"  写道:
> >最大精度为38,这个是有什么说法吗,为什么不是1000。如果我需要更高精度的DECIMAL我改怎么做?例如我需要DECIMAL(50, 18)
>


Re: sql-client提交新任务会覆盖前面的任务

2021-12-22 文章 Caizhi Weng
Hi!

图片无法在邮件中显示。

你说的“显示的作业是一样的”指的是作业名吗?如果要修改作业名,可以设置 pipeline.name 为你需要的作业名。

Fei Han  于2021年12月22日周三 18:34写道:

>
> 大家好,在使用sql-client的过程中,提交一个新的任务会覆盖前面的任务。
> 模式是perjob,flink1.13.3 ,为什么在flinkwebui上面显示的作业是一样的?
>
>
>
>
>


Re: flink固定延迟重启策略没有延迟

2021-12-21 文章 Caizhi Weng
Hi!

log 里的这些信息是同一个 job 里不同的并发分别 fail(可以从 2/3 和 3/3 这两个不同的并发号看出来),并不是说这个 job
fail 了两次。

宋品如  于2021年12月22日周三 10:14写道:

> 发件人: Song PinRu
> 发送时间: 2021年12月21日 15:19
> 收件人: user-zh@flink.apache.org
> 主题: flink固定延迟重启策略没有延迟
>
>  Hi:
> 昨天的邮件截图看不了,把日志贴上来重新发送一份
> --
>
> 查看日志发现固定延迟重启策略似乎没有生效,我设置的是30s延迟重启2次,
>
> 但是日志显示的是在06:26:50这1秒内重启了2次都失败了,并最终导致任务失败,
>
> 我设置的延迟时间似乎完全没有生效,Flink版本是1.12.2。
>
> 有没有人能告诉我这是为什么?
>
>
>
>  设置重启策略的代码:
> ```
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val backend = new
> FsStateBackend(CommonConfig.FLINK_STATEBACKEND_CHECKPOINT)
> env.setStateBackend(backend)
> // 每 3ms 开始一次 checkpoint
> env.enableCheckpointing(3)
> // 设置模式为精确一次 (这是默认值)
>
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> // 确认 checkpoints 之间的时间会进行 500 ms
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
> // Checkpoint 必须在2分钟内完成,否则就会被抛弃
> env.getCheckpointConfig.setCheckpointTimeout(12)
> // 可容忍checkpoint失败次数
> env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
> // 同一时间只允许一个 checkpoint 进行
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> //设置全局并行度
> //  env.setParallelism(3)
> //重启策略
> //PS:默认策略会重启int最大值次,导致任务一直处于重启状态,checkpoint出现连续空文件夹,同时导致有效checkpoint无法使用
>
>
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,
> Time.seconds(30)))
> ```
>
>
>
>
>
>
> 日志:
> ```
>
>  2021-12-21 06:26:50,850 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> source -> note -> Sink: sink (2/3) (85ad9ee0f52f04c5709430a8c793817a)
> switched from RUNNING to FAILED on
> container_e1595_1638345947522_0010_01_03 @ pbj-cdh-20-72.optaim.com
> (dataPort=35530).
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka: This server is not the leader for that topic-partition.
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
> ~[dws_module-1.0.4.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
> ~[dws_module-1.0.4.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> ~[dws_module-1.0.4.jar:?]
>
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
> ~[dws_module-1.0.4.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
> ~[dws_module-1.0.4.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(Ka

Re: flink固定延迟重启策略没有延迟

2021-12-21 文章 Caizhi Weng
Hi!

截图无法在邮件中显示,可以使用外部图床上传,或直接把 log 的内容贴在邮件里。

Song PinRu  于2021年12月21日周二 15:54写道:

> 查看日志发现固定延迟重启策略似乎没有生效,我设置的是30s延迟重启2次,
>
> 但是日志显示的是在06:26:50这1秒内重启了2次都失败了,并最终导致任务失败,
>
> 我设置的延迟时间似乎完全没有生效,Flink版本是1.12.2。
>
> 有没有人能告诉我这是为什么?
>
>
>
> 日志的截图:
>
>
>
> [image: cid:image001.png@01D7F67C.D00DC560]
>
>
>
> [image: cid:image002.png@01D7F67C.D00DC560]
>
>
>
> 设置重启策略的代码:
>
> val env = StreamExecutionEnvironment.
> *getExecutionEnvironment *val backend = new FsStateBackend(CommonConfig.
> *FLINK_STATEBACKEND_CHECKPOINT*)
> env.setStateBackend(backend)
> // 每 3ms 开始一次 checkpoint
> env.enableCheckpointing(3)
> // 设置模式为精确一次 (这是默认值)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.
> *EXACTLY_ONCE*)
> // 确认 checkpoints 之间的时间会进行 500 ms
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
> // Checkpoint 必须在2分钟内完成,否则就会被抛弃
> env.getCheckpointConfig.setCheckpointTimeout(12)
> // 可容忍checkpoint失败次数
> env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
> // 同一时间只允许一个 checkpoint 进行
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> //设置全局并行度
> //  env.setParallelism(3)
> //重启策略
> //PS:默认策略会重启int最大值次,导致任务一直处于重启状态,checkpoint出现连续空文件夹,同时导致有效checkpoint无法使用
> env.setRestartStrategy(RestartStrategies.*fixedDelayRestart*(2, Time.
> *seconds*(30)))
>
>
>


Re: flinksql自定义udaf函数

2021-12-20 文章 Caizhi Weng
Hi!

这个自定义类型是作为 accumulator 还是被聚合的值?如果是 accumulator 则不应该报错,能否分享一下 udaf
的代码?如果是作为被聚合的值,目前自定义类型只支持 pojo,对 pojo 的要求详见 [1]。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos

陈卓宇 <2572805...@qq.com.invalid> 于2021年12月20日周一 17:00写道:

> 在自定义udaf函数实现中使用了一些flinksql不支持的数据类型
> 想请问如何进行自定义数据类型的实现
>
>
>
>
>
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. An error occurred in the type inference logic of
> function 'Average'.
>     at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
>     at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>     at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
>     at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>     at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
>     at com.analysys.avg2.main(avg2.java:70)
> Caused by: org.apache.flink.table.api.ValidationException: An error
> occurred in the type inference logic of function 'Average'.
>     at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
>     at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
>     at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
>     at java.util.Optional.flatMap(Optional.java:241)
>     at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
>     at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
>     at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183)
>     at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200)
>     at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
>     at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945)
>     at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>     at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)
>     ... 5 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a valid type inference for function class
> 'com.analysys.avg2$Average'. Please check for implementation mistakes
> and/or provide a corresponding hint.
>     at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
>     at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
>     at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
>     at
> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
>     at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
>     ... 16 more
> Caused by: org.apache.flink.table.api.ValidationException: Error in
> extracting a signature to accumulator mapping.
>     at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
>     at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:135)
>     at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:168)
>     at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
>     ... 19 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> extract a type inference from method:
> public void
> com.analysys.avg2$Average.accumulate(com.analysys.avg2$SumCount,java.lang.Integer)
>     at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
>     at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
>     at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:124)
>     ... 21 more
> Caused by: org.apache.flink.table.api.ValidationExce

Re: 订阅开发邮箱失败

2021-12-20 文章 Caizhi Weng
Hi!

邮件问题属于 apache infrastructure [1] 的问题,在他们的官网上有联系方式 [2],可以按这个联系方式把问题发过去看看。

[1] https://infra.apache.org/
[2] https://infra.apache.org/contact.html

陈卓宇 <2572805...@qq.com.invalid> 于2021年12月20日周一 21:17写道:

> 社区您好:
>    近日看到FLIP有个新的话题我非常感兴趣,一顿操作,发现需要订阅
> d...@flink.apache.org邮箱才可以进行讨论
> 我就按照定义中文邮件列表的方式去订阅d...@flink.apache.org,发现失败,失败返回内容为:
> Hi. This is the qmail-send program at apache.org.
> I'm afraid I wasn't able to deliver your message to the following
> addresses.
> This is a permanent error; I've given up. Sorry it didn't work out.
>
>  ezmlm-reject:  fatal: Sorry, I don't accept commands in the subject line.
> Please send a  message to the -help address shown in the the
> ``Mailing-List:'' header  for command info (#5.7.0)
>
> --- Below this line is a copy of the message.
>
> Return-Path: <2572805...@qq.com>
> Received: (qmail 13677 invoked by uid 99); 20 Dec 2021 12:10:55 -
> Received: from spamproc1-he-fi.apache.org (HELO spamproc1-he-fi.apache.org)
> (95.217.134.168)
>     by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Dec
> 2021 12:10:55 +
> Received: from localhost (localhost [127.0.0.1])
> by spamproc1-he-fi.apache.org (ASF Mail Server at
> spamproc1-he-fi.apache.org) with ESMTP id 77A79C0518
> for  (UTC)
> X-Virus-Scanned: Debian amavisd-new at spamproc1-he-fi.apache.org
> X-Spam-Flag: NO
> X-Spam-Score: 3.856
> X-Spam-Level: ***
> X-Spam-Status: No, score=3.856 tagged_above=-999 required=6.31
> tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
> DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25,
> HELO_DYNAMIC_IPADDR=3.243, HTML_MESSAGE=0.2, RDNS_DYNAMIC=0.363,
> SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled
> Authentication-Results: spamproc1-he-fi.apache.org (amavisd-new);
> dkim=pass (1024-bit key) header.d=qq.com
> Received: from mx1-ec2-va.apache.org ([116.203.227.195])
> by localhost (spamproc1-he-fi.apache.org [95.217.134.168])
> (amavisd-new, port 10024)
> with ESMTP id 2FpNoK3cpYDH for  Mon, 20 Dec 2021 12:10:52 + (UTC)
> Received-SPF: Pass (mailfrom) identity=mailfrom;
> client-ip=203.205.221.209; helo=out203-205-221-209.mail.qq.com;
> envelope-from=2572805...@qq.com; receiver= Received: from out203-205-221-209.mail.qq.com (
> out203-205-221-209.mail.qq.com [203.205.221.209])
> by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org)
> with ESMTPS id 98462BDA2D
> for  (UTC)
> DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=qq.com; s=s201512;
> t=1640002248; bh=QBkwoKj03aEYKQoYIoJ9K1M15KLUtems616segryJeE=;
> h=From:To:Subject:Date;
> b=kQrg7Js/UjAKoTjoskowh3G5krnJECzxQNDYxDrua/PibvQCvG3tI6f5SZjtjLYFC
>  Y09pBwSo1vCqqJdJjWaFy89fMWVSaDT7viQeiT9NW6u2FkT9j5TwhMu01tw4a+plKT
>  P+1BSHenF0dck3xJU/MVZ2ZcSjPGFNRW636C6Lj4=
> X-QQ-FEAT: 8OvYDnhju3M7KJJmBoxMrwqIdLziThcp
> X-QQ-SSF: 005000L
> X-QQ-XMAILINFO:
> N7pVCJF/rxxTYiUbEFGOtOs+MJ0LcbaBKQeUJ2M0qO/pRn4Rh7ukdchpDNa7Et
>
>  1V+jWJwa4x4RORxUiPgxjLAFa+bgyrme4CaqYpzn1dkhpgnUvnEar2DSN8vWSR+21Xl781JuumW6u
>
>  QEO0k5gjHHQ4YaqiddaOSJBwIplDpns3yY1mKyqOXPaqQRyLuP0PFCC63eU8K7G/ZRN3FQxxZwjy8
>
>  GCIcmbuztrSvJ/S2PP5b4W/idam6gFQqQvd5afA54zg3RLban94vMSzyWhrt8b4tZhzUwza+aie7+
>
>  EKMqj+aQJCFKMKBTdXJbQFJvO7caV5Kolc9U18JH4YSoIDRwXxmL3JwQDnXauyV9uhhs2hJzVCHVw
>
>  /DeJfzSKUURZiKfCs689LicPafZXXXoWBzgZiOdAFr9zrYcPrRTgj6SiZ/bifzC1O/VwhbnbEYZkT
>
>  7WttGFBpGCSf8XQNKXa2Mo11V6aAIxdq+iq2oxv3RglkPgWMfM4gTr4tZGYvlubNGehnSIUA5p5XA
>
>  v8OdtqlRAreFXpTDHsaoBTpLRL3bg5rL6iA7D6Iz1La/kS9nxTJ8kAp6oM8GFmyR9gC2tW/mGxd1t
>
>  2ArvYqFAVi7Q7xi2V9HZiZXs0Te+G4Kes6D4wbB3A00MTK7Vmu6Y/vB6UFJmW64U8fljSjelIPMqM
>
>  TGc4JQplhh7TpvPtUPxeHIIiMlxlSo4v5VstK3G+Y0CK/Pn25DFI9DIjN+5uQAwLB752MObXDbxoo
>  TuSleU3snizikdxRpcDXnmf+qeBhGig68Ji5eDYWoqUV5C6bLJEn4bIsx
> X-HAS-ATTACH: no
> X-QQ-BUSINESS-ORIGIN: 2
> X-Originating-IP: 221.223.219.19
> X-QQ-STYLE:
> X-QQ-mid: webmail205t1640002247t2450875
> From: "=?gb18030?B?s8LXv9Pu?=" <2572805...@qq.com>
> To: "=?gb18030?B?ZGV2?="  Subject: Subscribe
> Mime-Version: 1.0
> Content-Type: multipart/alternative;
> boundary="=_NextPart_61C072C7_1106E1C8_673614BB"
> Content-Transfer-Encoding: 8Bit
> Date: Mon, 20 Dec 2021 20:10:47 +0800
> X-Priority: 3
> Message-ID:  X-QQ-MIME: TCMime 1.0 by Tencent
> X-Mailer: QQMail 2.x
> X-QQ-Mailer: QQMail 2.x
>
> This is a multi-part message in MIME format.
>
> --=_NextPart_61C072C7_1106E1C8_673614BB
> Content-Type: text/plain;
> charset="gb18030"
> Content-Transfer-Encoding: base64
>
> U3Vic2NyaWJlDQoNCg0Ks8LXv9PuDQoNCg0KJm5ic3A7
>
> --=_NextPart_61C072C7_1106E1C8_673614BB
> Content-Type: text/html;
> charset="gb18030"
> Content-Transfer-Encoding: base64
>
> U3Vic2NyaWJlPGJyPjxicj48ZGl2PjxociBzdHlsZT0ibWFyZ2luOiAwIDAgMTBweCAwO2Jv
> cmRlcjogMDtib3JkZXItYm90dG9tOjFweCBzb2xpZCAjRTRFNUU2O2hlaWdodDowO2xpbmUt
> aGVpZ2h0OjA7Zm9udC1zaXplOjA7cGFkZGluZzogMjBweCAwI

Re: RowTime 空值过滤问题

2021-12-17 文章 Caizhi Weng
Hi!

你可以定义一个计算列,DDL 就写成:

CREATE TABLE myTable (
  ...,
  server_end_time TIMESTAMP(3),
  server_end_time_not_null AS IF(server_end_time IS NOT NULL,
server_end_time, 默认值),
  WATERMARK FOR server_end_time_not_null AS server_end_time_not_nul
) WITH (
  ...
)

45329722 <45329...@qq.com.invalid> 于2021年12月17日周五 18:42写道:

> 请教:
> java.lang.RuntimeException: RowTime field should not be null, please
> convert it to a non-null long value.
>
>
> 说明:源数据表中有一个时间字段:server_end_time, MySQL,有Null值,但源数据不能修改,
>  String order_sql = "create TABLE sd_service_order (" +
>  "server_end_time TIMESTAMP(3)," +
> "WATERMARK FOR server_end_time AS server_end_time) With .."
> 通过   tEnv.executeSql(order_sql); 注册:
>
>
>  server_end_time  实际数据中有空值(NULL), 请问:
>
>
> 1、executeSql 中能不能给个默认值?
> 2、 String out = "insert into out_data_1 " +
>                 " select " +
>                 "
> TUMBLE_START(server_end_time, INTERVAL '1' DAY) as window_start," +
>                 "
> TUMBLE_END(server_end_time, INTERVAL '1' DAY) as window_end," +
>                 " corp_id,
> county_id, area_id, area_name, station_id, station_name, " +
>                 " service_id,
> service_name, item_id, item_name, " +
>                 " sum(item_cost)
> as item_cost_sum, " +
>                 " count(item_id)
> as item_id_count " +
>                 " from
> sd_service_order " +
>                 " where
> server_end_time is not null" +
>                 " group by
> TUMBLE(server_end_time, INTERVAL '1' DAY)," +
>                 " 
>  corp_id, county_id, area_id, area_name, station_id, station_name, " +
>                 " 
>  service_id, service_name, item_id, item_name";
>         tEnv.executeSql(out);
>
>
>      使用 " where server_end_time is not null" 
> 过滤问题数据,但是好像没有起作用,报错信息:
> where=[CAST(server_end_time) IS NOT NULL]) (3/4)#24
> (4c7c6a76b34efb3f70df2d8e19cf3f08) switched from RUNNING to FAILED with
> failure cause: java.lang.RuntimeException: RowTime field should not be
> null, please convert it to a non-null long value.
>
>
> 请教一下这种数据怎么处理过滤掉?
> 谢谢!
>
>
>
>
> 45329...@qq.com
>
>
>  


Re: 双流窗口内join用flink sql实现的语法是什么?

2021-12-16 文章 Caizhi Weng
Hi!

从 Flink 1.14 开始,Flink SQL 支持 window join [1]。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-join/

casel.chen  于2021年12月17日周五 08:47写道:

> 每隔5分钟join来自两条流的数据,用flink sql实现的写法是什么?
> 需要先join再窗口计算还是可以直接窗口内join? flink版本是1.13


Re: Flink SQL 有办法access State吗

2021-12-14 文章 Caizhi Weng
Hi!

可以用 SQL 的聚合函数 [1] 实现。内置聚合函数详见 [2],也可以自定义聚合函数,详见 [3]。

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/group-agg/
[2]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/systemfunctions/#%E8%81%9A%E5%90%88%E5%87%BD%E6%95%B0
[3]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%E8%81%9A%E5%90%88%E5%87%BD%E6%95%B0

Pinjie Huang  于2021年12月14日周二 15:48写道:

> 之前的DataStream API 我们可以通过State进行一些复杂的逻辑。比如所有message的某个field的最大值。Flink
> SQL有类似的方法吗?
>


Re: BroadcastConnectedStream处理顺序问题

2021-12-13 文章 Caizhi Weng
Hi!

可以看一下 event time temporal join [1] 是否满足需求。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join

casel.chen  于2021年12月12日周日 11:12写道:

> 有一个场景是通过一张mysql的控制表来控制kafka流中的元素分流到不同下游sink,例如事实表的kafka和维表hbase等。
> 可我发现BroadcastConnectedStream处理顺序是随机的,有可能kafka数据来了(processElement方法)但还对应的MapState里面并没有该数据对应的控制规则(规则已经写到mysql控制表并被flink
> cdc获取到控制流里面,只是还没有被processBroadcastElement方法处理到)。但我是想所有的维表数据都先建立好在MapState里面,不会出现找到对应key情况,请问这要怎么实现呢?


Re: Flink 升级到1.13.2后出现新的问题

2021-12-13 文章 Caizhi Weng
Hi!

从 Flink 1.13 开始,引入了新的时间类型 timestamp_ltz,对应一个时间点,可以认为是 Java 的
Interval。之间的时间类型 timestamp 可以认为对应一个字符串。具体区别详见 [1]。

现在 current_timestamp 函数会返回 timestamp_ltz,不能直接和 timestamp 类型做运算。

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/timezone/

Pinjie Huang  于2021年12月13日周一 14:14写道:

> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: org.apache.flink.table.planner.codegen.CodeGenException:
> TIMESTAMP_LTZ only supports diff between the same type.
>


Re: flinksql相关问题

2021-12-11 文章 Caizhi Weng
Hi!

stmtSet.execute() 默认是异步的,只是提交作业而不会等待作业完成。如果需要等待作业完成再进行后续步骤,需要用
stmtSet.execute().await()。

陈卓宇 <2572805...@qq.com.invalid> 于2021年12月10日周五 20:25写道:

> 您好社区:
>     
> 我在使用flinksql将数据表A_now写入到数据库中后还有一步操作:将表A删除,完成将A_now更名为A,的切表操作。
> 发现当执行:
> //sql 插入数据到数据库操作
>
> StatementSet stmtSet = tenv.createStatementSet () ;
> stmtSet.addInsertSql ( insertSqlMongoDB ) ;
> stmtSet.addInsertSql ( insertSql ) ;
> stmtSet.execute () ;
> //完成后进行切表:
> /**  进行切表,删表 */
> 试试 {
> MongoUtil2 实例 = MongoUtil2。 获取实例 () ;
> MongoCollection < Document > oldData = instance.getCollection ( db,
> "t_up_tag_data_" +mongoKey ) ;
> MongoCollection "t_up_tag_metadata_"+mongoKey);
>
> 如果 ( 旧数据!= null ){
> oldData.drop () ;
> }
> 如果 ( 旧元数据!= null ){
> oldmetadata.drop () ;
> }
> MongoCollection < 文档 > data = instance.getCollection ( db,
> "t_up_tag_data_" +mongoKey+ "_now" ) ;
> MongoCollection < Document > metadata = instance.getCollection ( db,
> "t_up_tag_metadata_" +mongoKey+ "_now" ) ;
>
> MongoCollection < Document > newData = instance.getCollection ( db,
> "t_up_tag_data_" +mongoKey ) ;
> MongoCollection < 文档 > newmetadata = instance.getCollection ( db,
> "t_up_tag_metadata_" +mongoKey ) ;
> data.renameCollection ( newData.getNamespace ()) ;
> metadata.renameCollection ( newmetadata.getNamespace ()) ;
> 如果 ( 数据!= 空 ){
> 数据.drop () ;
> }
> 如果 ( 元数据!= null ){
> 元数据.drop () ;
> }
> } 捕获 ( 异常 e ){
> 记录 .info ( e.getMessage ()) ;
> }
> 发现切表逻辑并未触发,请问这是什么原因,我该如何修改使整个流程完整走完
>
> 陈卓
>
>
>  
>  


Re: 关于异步 AsyncTableFunction CompletableFuture 的疑问

2021-12-06 文章 Caizhi Weng
Hi!

1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?


正确。例如 HBaseRowDataAsyncLookupFunction 里就调用了 hbase 提供的 table#get 异步方法实现异步查询。

2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?


Async operator 就是为了提高无需保序的操作(例如很多 etl 查维表就不关心顺序)的效率才引入的,如果对顺序有强需求就不能用 async
operator。


陌↓莣言  于2021年12月7日周二 13:06写道:

> deal all: 目前在看table api 中,自定义的异步 join 方法
> AsyncTableFunction#eval 方法时,发现接口提供的是: public void
> eval(CompletableFuture {...} 目前遇到两个问题: 1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join
> 得自己实现,这个理解对吗? 2. 像join hbase
> 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?
> 有各位大佬方便介绍一下吗?或者更详细的文档说明之类的? 非常感谢。


Re: 关于异步 AsyncTableFunction CompletableFuture 的疑问?

2021-12-06 文章 Caizhi Weng
Hi!

1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?


正确。例如 HBaseRowDataAsyncLookupFunction 里就调用了 hbase 提供的 table#get 异步方法实现异步查询。

2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?


Async operator 就是为了提高无需保序的操作(例如很多 etl 查维表就不关心顺序)的效率才引入的,如果对顺序有强需求就不能用 async
operator。

Michael Ran  于2021年12月7日周二 10:33写道:

> deal all:
> 目前在看table api 中,自定义的异步 join 方法 AsyncTableFunction#eval
> 方法时,发现接口提供的是:
> public void eval(CompletableFuture>
> future,Object... keys) {...}
>
>
> 目前遇到两个问题:
>
>
> 1. 直接用 futrue.complate(rowdata) 传递数据,只是实现了发送是异步,异步join 得自己实现,这个理解对吗?
> 2. 像join hbase 里面通过线程池实现了join异步,是无法保证顺序,并没有看到任何保证顺序的操作,还是有其他逻辑保证顺序吗?
>
>
> 有各位大佬方便介绍一下吗?或者更详细的文档说明之类的?
> 非常感谢。
>


Re: 关于streamFileSink在checkpoint下生成文件问题

2021-12-01 文章 Caizhi Weng
Hi!

邮件里看不到图片和附件,建议使用外部图床。

partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。

黄志高  于2021年12月1日周三 下午9:53写道:

> hi,各位大佬,咨询个问题
>
>  
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看
>
>
>
>


Re: flink sql group by后收集数据问题

2021-11-30 文章 Caizhi Weng
Hi!

UDF 支持 ROW 类型,详见 [1] 中关于 ROW 的示例。

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc

casel.chen  于2021年12月1日周三 上午7:56写道:

> 业务中使用flink sql group by操作后想收集所有的数据,如下示例:
>
>
> kafka源表:
> 班级 学号  姓名  年龄
> 1 20001张三   15
> 2 20011李四   16
> 1 20002王五   16
> 2 20012吴六   15
>
>
> create table source_table (
>class_no: INT,
>student_no: INT,
>name: STRING,
>age: INT
> ) with (
>'connector' = 'kafka',
>...
> );
>
>
> mongodb目标表:
> 班级 学生信息
> 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no":
> 20002, "name":"王五", "age": 16}]
> 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no":
> 20012, "name":"吴六", "age": 15}]
>
>
> create table sink_table (
>   class_no INT,
>   students: ARRAY>
> ) with (
>   'connector' = 'mongodb',
>   ...
> );
>
>
> 查了下flink自带的系统函数,接近满足条件的只有collect函数。
> insert into sink_table select class_no, collect(ROW(student_no, name, age)
> from source_table group by class_no;
>
>
> 但它返回的是Multiset类型,即Map Integer>。如果key的类型是ROW,像我这种场景,直接写mongodb会抛错,因为它会自动强制将key的类型转成STRING。
> 何况这里我只想收集Array[ROW],相当于只要Map中的keyset,即去重后的Array。
>
>
> 1.
> 如果要收集去重的Array[ROW],有什么办法可以做到吗?我曾尝试写UDF,但UDF不支持ROW类型,只支持具体的数据类型,有何建议或参考例子?
> 2. 如果要收集不去重的Array[ROW],又该怎么写?
> 3. 访问一个数据类型为Map的数据中key和value,分别要用什么flink sql语法?
>
>
> 谢谢解答!
>
>
>
>
>
>
>
>
>
>
>
>


Re: flink sql ROW()语句中是不是不能再使用case when?

2021-11-30 文章 Caizhi Weng
Hi!

目前 ROW 的构造不支持内部调用函数,建议先在前面 create view 把需要的值计算好。

casel.chen  于2021年12月1日周三 上午7:59写道:

>
>
> select ROW(field1, field2, case when field3 = 'xxx' then 'T' else 'F' as
> field3) from source_table
>
>
> 这样的语句语法检查会通不过。


Re: flink sql太多insert into语句问题

2021-11-30 文章 Caizhi Weng
Hi!

感谢提出问题。方案一应该是最合适的,“算子名称长度超过限制而失败”不是期望行为,具体是什么样的错误栈?

casel.chen  于2021年12月1日周三 上午8:10写道:

> 我们有一个场景需要从一张kafka源表写很多不同告警规则到同一张告警目标表。规则数量大概有300多个,采用纯flink sql实现。
>
>
> 方案一是采用创建视图,将不同规则union all起来,再将视图插入目标表,发现算子都chain成了一个,因为flink
> sql算子的名称是flink sql内容,所以算子名称长度超过限制而失败。因而转向方案二
> 方案二是一条规则对应一条insert into语句,生成graph图会发现fan
> out特别多。这次没有算子名称超长问题,但作业起动会特别慢。考虑到后续规则还会进行修改,添加或删除。这样慢启动无法接受。
>
>
> 想问一下,这种场景最适合的做法是什么?谢谢!


Re: flink访问多个oss bucket问题

2021-11-30 文章 Caizhi Weng
Hi!

如果只是 bucket 不同的话,通过在 with 参数里指定 path 即可。

如果连 ak id 和 secret
都不同,可以考虑实现自己的 com.aliyun.oss.common.auth.CredentialsProvider 接口,并在 flink
conf 中指定 fs.oss.credentials.provider 为对应的实现类。

casel.chen  于2021年12月1日周三 上午8:14写道:

> flink平台作业写数据到客户oss bucket,和flink平台作业本身做checkpoint/savepoint用的oss
> bucket不是同一个。
> 请问这种场景flink是否支持,如果支持的话应该要怎么配置?谢谢!


Re: Flink sql jdbc Partitioned Scan timestamp不生效

2021-11-30 文章 Caizhi Weng
Hi!

scan.partition.lower-bound 和 scan.partition.upper-bound 都是一个 long 值(而不是一个
timestamp 字符串的形式)。它们将会转换成 where  between  and
 的 SQL 语句通过 jdbc 获取数据。可以检查一下配置项的格式和值的范围是否符合期望。

天下五帝东  于2021年12月1日周三 上午9:23写道:

> Hi:
>我在使用flink sql jdbc connector测试partitioned scan功能,发现指定
> scan.partition.column 为timestamp类型时,scan.partition.lower-bound
>
> 和scan.partition.upper-bound指定具体的值后,没有读取到相关数据,哪位大佬帮忙解答下
>
> 谢谢
>
>


Re: Time attribute will be lost after two(or more) table joining

2021-11-29 文章 Caizhi Weng
Hi!

As this mail is written in English I'm also forwarding this to the user
mailing list.

Streaming joins do not retain row time attribute and this is the expected
behavior. As you're windowing the results of joins I guess you're enriching
the records from one stream with that join. Lookup joins [1] and event time
temporal join [2] will retain row time and their results can be used by
windowing operators later. Do they meet your needs?

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#lookup-join
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#event-time-temporal-join

Pinjie Huang  于2021年11月29日周一 上午10:44写道:

> Hi Friends,However, we found that the time attribute will be lost after
> table joining, which means that we cannot do the joining and aggregation at
> one SQL query statement. There will be no output after the above SQL
> querying, for SQL queries on streaming tables, the time_attr argument of
> the group window function
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
> >
> must refer to a valid time attribute that specifies the processing time or
> event time of rows. In wide_table,  the time_attr of field
> eventInfo_eventTime has been lost.
>
> Any ideas?
>


Re: flink web UI job overview 中的算子记录条数不更新

2021-11-29 文章 Caizhi Weng
Hi!

图片看不到,也没有附件。建议使用外部图片上传页面。

“算子对应的接收和发送记录数不更新”,这里的“算子”指的是 web UI 上的一个方框吗?如果这个方框没有其他方框给它数据数据(也就是说这是一个
source task)那么它的接收数就是 0;如果这个方框不向其他方框送出数据(也就是说这是一个 sink task)那么它的发送数就是 0。

zhiyuan su  于2021年11月30日周二 上午10:09写道:

> Hello,麻烦问下,为啥算子链 默认合并之后,数据更新,算子对应的接收和发送记录数不更新,一直显示为0.
>
> 图片发送了看不到,请看附件
>
>


Re: FlinkSQL源码分段优化中,物理计划转换为ExecNodeGraph的时候,SameRelObjectShuttle、SubplanReuseShuttle一拆一合目的是啥

2021-11-24 文章 Caizhi Weng
Hi!

这是因为我们有配置关闭 subplan reuse 和 source reuse,因此需要先把 plan 拆开,然后再判断是否允许
reuse,如果允许才能合并。

岳晗  于2021年11月24日周三 下午3:55写道:

> Hi,
>
>
> 请问下FlinkSQL物理计划转换为ExecNodeGraph的时候,拿到optimizedRelNodes后,
>
>
> 首先执行:SameRelObjectShuttle Rewrite same rel object to different rel objects.
> e.g.
>       Join             
>          Join
>      /    \         
>            /    \
>  Filter1 Filter2     =>     Filter1
> Filter2
>      \   /         
>            |      |
>       Scan             
>     Scan1    Scan2
>
>
>
> 然后执行:SubplanReuseShuttle
>        Join           
>           Join
>      /      \       
>           /      \
>  Filter1  Filter2         
> Filter1  Filter2
>     |        |       
> =>       \     /
>  Project1 Project2            Project1
>     |        |       
>               |
>   Scan1    Scan2           
>  Scan1
>
> val shuttle = new SameRelObjectShuttle()
> val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle))
> // reuse subplan
> val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj,
> config)
> // convert FlinkPhysicalRel DAG to ExecNodeGraph
> val generator = new ExecNodeGraphGenerator()
> 目的是啥,谢谢回复。


Re: 检查点的start-delay、alignment duration理解。

2021-11-24 文章 Caizhi Weng
Hi!

这个 checkpoint 时间的差距确实不太符合预期,因为 compact 一般都是比较快的。建议看一下 task3 的 jstack 以及 gc
情况,确认是不是 task3 gc 严重阻塞 checkpoint,以及 task3 具体在做什么。

yidan zhao  于2021年11月24日周三 下午7:03写道:

> 如题,文档
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/checkpoint_monitoring/
> 。
>
> 我目前情况是,有个任务,从kafka读取数据,写hive,orc格式,带了compact功能,align 模式。
> 目前发现DAG为4个Task,分别为:
> Task1: Source: TableSourceScan ... ... -> streaming-writer 并行度60
> Task2: compact-coordinator 并行度1
> Task3: compact-operator 并行度60
> Task4: PartitionCommitter -> Sink: end 并行度1
>
> 目前观察到检查点存在超时,我给一个ckpt数据,耗时18min30s,如下是E2E时长。
> Task1: 49s。
> Task2: 1s。
> Task3: 18min20s。
> Task4: 18min30s。
> 继续观察Task3的subtask的detail信息,compact-operator算子ckpt显示数据是非常小,几乎没有。同时alignment
> duration都是0ms,也就是不存在对齐时长。 但是 start-delay 却很长,有的达到16min左右,不清楚这个是为啥?
>
> 目前根据文档分析下来,start-delay是从第一个barrier创建 到
> 该barrier到达该subtask的时长,不清楚我这个DAG任务情况下,是什么导致这个情况呢?
> 为什么第一个barrier到达都这么耗时。
>


Re: FlinkSql回撤流

2021-11-24 文章 Caizhi Weng
Hi!

无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
CloseableIterator,然后通过 Row#getKind 获得该 row 对应的 op。

顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。

wushijjian5  于2021年11月24日周三 下午9:05写道:

>
> DataStream> dataStream = 
> env.fromElements(
> new Tuple4<>("a", "a1",30,1),
> new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
> new Tuple4<>("a","a2",30,1),
> new Tuple4<>("a","a3",30,1));
> tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"), 
> $("num"), $("flag"));
> Table table = tEnv.sqlQuery(
> " select user,sum(num) as num" +
> " from (" +
> "   select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as flag " 
> +
> "from tmpTable " +
> "group by user,ord " +
> ") t1" +
> " where flag=1 " +
> " group by user" +
> "");
> table.execute().print();
>
>
> 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx   只通过sql的方式
>
>


Re: jdk11创建hive catalog抛错

2021-11-22 文章 Caizhi Weng
Hi!

这是从 hive 里产生的错误。据我所知,hive 对 Java 11 的支持仍在建设中 [1],因此还是建议使用 Java 8。

[1] https://issues.apache.org/jira/browse/HIVE-22415

aiden <18765295...@163.com> 于2021年11月22日周一 下午12:00写道:

> 求助,jdk从8升级到11后使用hive作为flink
> table的catalog抛错,排查是bsTableEnv.registerCatalog(catalogName, catalog)
> 抛错,具体异常为:
> 11:55:22.343 [main] ERROR hive.log - Got exception:
> java.lang.ClassCastException class [Ljava.lang.Object; cannot be cast to
> class [Ljava.net.URI; ([Ljava.lang.Object; and [Ljava.net.URI; are in
> module java.base of loader 'bootstrap')
> java.lang.ClassCastException: class [Ljava.lang.Object; cannot be cast to
> class [Ljava.net.URI; ([Ljava.lang.Object; and [Ljava.net.URI; are in
> module java.base of loader 'bootstrap')
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:274)
> [hive-exec-2.1.1.jar:2.1.1]
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:210)
> [hive-exec-2.1.1.jar:2.1.1]
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method) ~[?:?]
> at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> [?:?]
> at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> [?:?]
> at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
> [?:?]
> at
> org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1652)
> [hive-exec-2.1.1.jar:2.1.1]
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:80)
> [hive-exec-2.1.1.jar:2.1.1]
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:130)
> [hive-exec-2.1.1.jar:2.1.1]
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:115)
> [hive-exec-2.1.1.jar:2.1.1]
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[?:?]
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:?]
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:?]
> at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at
> org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:54)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:277)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:78)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:68)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:32)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:296)
> [flink-connector-hive_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:195)
> [flink-table-api-java-1.14.0.jar:1.14.0]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:373)
> [flink-table-api-java-1.14.0.jar:1.14.0]
> at catalogTest.FlinkExecTableRun.flinkMain(FlinkExecTableRun.java:27)
> [classes/:?]
> at catalogTest.test.main(test.java:11) [classes/:?]
> 11:55:22.348 [main] ERROR hive.log - Converting exception to MetaException
> Exception in thread "main"
> org.apache.flink.table.catalog.exceptions.CatalogException: Failed to
> create Hive Metastore client
> at
> org.apache.flink.table.catalog.hive.client.HiveShimV200.getHiveMetastoreClient(HiveShimV200.java:61)
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:277)
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:78)
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.(HiveMetastoreClientWrapper.java:68)
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:32)
> at
> org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:296)
> at
> org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:195)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:373)
> at catalogTest.FlinkExecTableRun.flinkMain(FlinkExecTableRun.java:27)
> at catalogTest.test.main(test.java:11)
> Caused by: java.lang.reflect.InvocationTargetException

Re: flink的job运行一段时间后, checkpoint就一直失败

2021-11-18 文章 Caizhi Weng
Hi!

checkpoint 超时有很多可能性。最常见的原因是超时的节点太忙阻塞了 checkpoint(包括计算资源不足,或者数据有倾斜等),这可以通过看
Flink web UI 上的 busy 以及反压信息判断;另外一个常见原因是 gc 太频繁,可以通过设置 jvm 参数打印出 gc log 观察。

yu...@kiscloud.net  于2021年11月18日周四 下午2:54写道:

> flink的job运行一段时间后, checkpoint就一直失败,信息如下:
> ID
> Status
> Acknowledged
> Trigger Time
> Latest Acknowledgement
> End to End Duration
> State Size
> Buffered During Alignment
> 295
> FAILED
> 30/5011:55:3811:55:391h 0m 0s205 KB0 B
> Checkpoint Detail:
> Path: - Discarded: - Failure Message: Checkpoint expired before completing.
> Operators:
> Name
> Acknowledged
> Latest Acknowledgment
> End to End Duration
> State Size
> Buffered During Alignment
> Source: dw-member
> 6/10 (60%)11:55:391s7.08 KB0 B
> Source: wi-order
> 6/10 (60%)11:55:391s7.11 KB0 B
> Source: dw-pay
> 6/10 (60%)11:55:391s7.11 KB0 B
> RecordTransformOperator
> 6/10 (60%)11:55:391s98.8 KB0 B
> RecordComputeOperator -> Sink: dw-record-data-sink
> 6/10 (60%)11:55:391s85.1 KB0 B
> SubTasks:
> End to End Duration
> State Size
> Checkpoint Duration (Sync)
> Checkpoint Duration (Async)
> Alignment Buffered
> Alignment Duration
> Minimum1s14.2 KB7ms841ms0 B13ms
> Average1s14.2 KB94ms1s0 B13ms
> Maximum1s14.2 KB181ms1s0 B15ms
> ID
> Acknowledgement Time
> E2E Duration
> State Size
> Checkpoint Duration (Sync)
> Checkpoint Duration (Async)
> Align Buffered
> Align Duration
> 1n/a
> 211:55:391s14.2 KB8ms1s0 B15ms
> 3n/a
> 411:55:391s14.2 KB181ms1s0 B13ms
> 5n/a
> 611:55:391s14.2 KB8ms1s0 B14ms
> 711:55:391s14.2 KB181ms961ms0 B13ms
> 8n/a
> 911:55:391s14.2 KB181ms841ms0 B13ms
> 1011:55:391s14.2 KB7ms1s0 B14ms
>
>
> 请问,这类问题如何排查,有没有好的建议或者最佳实践?谢谢!
>


Re: inStreamingMode和inBatchMode打印内容不同产生的疑惑

2021-11-18 文章 Caizhi Weng
Hi!

流作业中产生的数据有不同类型,例如插入(+I),删除(-D),更新(-U、+U),它们的具体说明见 [1]。print sink 实际上较多用于
debug,因此用户实际上不太需要关心这些 op 的含义。

批作业中产生的所有数据都是插入,因此不需要 op。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/types/RowKind.html

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月18日周四 下午8:02写道:

> 在inStreamingMode会出现 首列op,行数据是+I
> 在inBatchMode模式下就没有
>
> 请问这个op列和+I是什么意思 为什么inStreamingMode模式下有而inBatchMode模式下没有
>
> 陈
>
>
>  


Re: flink的sql写hdfs如何指定文件名称

2021-11-17 文章 Caizhi Weng
Hi!

Flink SQL 写 hdfs 应该产生的是一个目录,比如写 hdfs 的时候指定的是 'path' =
'hdfs:///data/test.csv',那么就会在 hdfs 里产生 /data/test.csv
这个目录,里面所有不以点(.)开头的文件都是可读的。如果用 Flink 读,那么直接指定目录,也就是 'path' =
'hdfs:///data/test.csv' 即可;如果通过其他方式读,注意读取目录下所有不以点(.)开头的文件。

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月18日周四 上午11:47写道:

>
> flinksql写HDFS后自动生成的名称:part-c4a19762-bde3-4f37-8b3c-b92d182b450c-task-0-file-0
> 然后发现我读取的时候麻烦了,请问如何指定flink的sql写hdfs如何指定文件名称
> 陈
>
>
>  


Re: 在进行table转streaming后报一个异常

2021-11-17 文章 Caizhi Weng
Hi!

这种文件用于在 task 之间交换数据。我对 windows
的行为不太了解,但看起来是这个临时文件被清理了。是否有设置什么自动清理策略?另外这样的错误如果只是偶发,Flink 的 failover
机制会让作业从 checkpoint 重新运行,不必担心作业的可用性和正确性。

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月17日周三 下午7:44写道:

> 场景:将table表转为streaming流进行一个关联维表操作后发生异常
>
> 异常内容:
> 2021-11-17
> 19:39:53.056|ERROR|org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition|flink-taskexecutor-io-thread-4|releaseInternal|233|Error
> during release of result subpartition:
> C:\Users\monster\AppData\Local\Temp\flink-netty-shuffle-76964c7f-ffc6-490f-82bc-a3091412955b\0be871a1d3b522377ffd8f565bb3b81f.channel
> java.nio.file.NoSuchFileException:
> C:\Users\monster\AppData\Local\Temp\flink-netty-shuffle-76964c7f-ffc6-490f-82bc-a3091412955b\0be871a1d3b522377ffd8f565bb3b81f.channel
>     at
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:79)
>     at
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>     at
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102)
>     at
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269)
>     at
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>     at java.nio.file.Files.delete(Files.java:1126)
>     at org.apache.flink.runtime.io
> .network.partition.FileChannelBoundedData.close(FileChannelBoundedData.java:97)
>     at org.apache.flink.runtime.io
> .network.partition.BoundedBlockingSubpartition.checkReaderReferencesAndDispose(BoundedBlockingSubpartition.java:253)
>     at org.apache.flink.runtime.io
> .network.partition.BoundedBlockingSubpartition.release(BoundedBlockingSubpartition.java:205)
>     at org.apache.flink.runtime.io
> .network.partition.BufferWritingResultPartition.releaseInternal(BufferWritingResultPartition.java:229)
>     at org.apache.flink.runtime.io
> .network.partition.ResultPartition.release(ResultPartition.java:246)
>     at org.apache.flink.runtime.io
> .network.partition.ResultPartitionManager.releasePartition(ResultPartitionManager.java:86)
>     at org.apache.flink.runtime.io
> .network.NettyShuffleEnvironment.lambda$releasePartitionsLocally$0(NettyShuffleEnvironment.java:181)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:748)
>
>
> 陈
>
>
>  


Re: FlinkSQL写hive orc表开启compact,任务异常重启,compact不生效?

2021-11-16 文章 Caizhi Weng
Hi!

因为 compact 是在每次 checkpoint 的时候进行的,在做 checkpoint 之前产生的文件都是以 . 开头的,表示当前不可见。只有
checkpoint
之后才会重命名为可见文件。因此如果任务频繁出现错误,这些不可见文件就会留在目录里,导致文件数增加。建议首先把任务为什么频繁出错查出来。

yidan zhao  于2021年11月16日周二 下午5:36写道:

>
> 如题,目前没有具体看是没生效,还是来不及compact。任务正常情况没问题,但是如果任务出现频繁错误,导致过一会重启一次,这种情况导致文件数暴增,compact功能不生效。
>


Re: Flink工程停止问题

2021-11-15 文章 Caizhi Weng
Hi!

Flink 本身不自带安全机制,需要通过外部系统完成访问限制。

疾鹰击皓月 <1764232...@qq.com.invalid> 于2021年11月16日周二 下午2:57写道:

> 您好
>
> Flink
> WebUI的左上角有一个cancel按钮,通过按钮可以停止Flink工程。但这会导致一定的权限问题。我们希望只有特定人员可以停止Flink工程,请问有没有方法可以让那个停止按钮不生效或者不显示呢?


Re: flink sql检查点是否可恢复问题

2021-11-14 文章 Caizhi Weng
Hi!

关于 Flink 版本升级的问题,小版本之间是兼容的,大版本之间的兼容性见 [1]。
关于修改 SQL 的问题,表 schema 发生变化是不兼容的,某些字段取值发生变化目前没有文档描述,且各算子行为稍有不同,需要尝试一下才能得知。
修改 with 参数应该是兼容的,但部分修改(比如改了 kafka topic)可能会导致报错或非期望行为,比如 offset 越界。
修改作业配置(在不影响最终 plan 的前提下)也是兼容的(例如你提到的这三个配置都是兼容的),更换 statebackend 和调资源正是
savepoint 原本需要解决的问题之一。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#compatibility-table

casel.chen  于2021年11月13日周六 上午9:01写道:

> 下面这些情况下哪些可以让flink sql作业从上一个检查点恢复?
>
> 1. 升级了flink引擎版本
>  a) 小版本  1.13.1 -> 1.13.2
>  b) 大版本   1.13.x -> 1.14.x
> 2. 修改了sql,分为2种
>  a) 表schema发生了变化
>  b) 表schema没变,只是某些字段取值内容发生变化,例如 从 count(*) 变成 count(*) + 1
> 3. 修改了with options参数,例如 cache大小和时长
> 4. 修改了作业配置,例如并行度,资源大小,换了statebackend等


Re: FlinkSQL 1.12 Temporal Joins 多表关联问题

2021-11-11 文章 Caizhi Weng
Hi!

这是说每次主流来一条数据,都要去维表里查询一次吗?然后你想每次攒一批数据,一次性查询以提高性能?

如果是的话,一部分维表(如 jdbc 和 hbase)支持 cache 功能 [1]。cache 功能可以在每次 cache 刷新的时候把数据加载到
task manager 内存中,这样主流来数据时只需要从 task manager 内存中查询对应数据即可,不必去外部系统查询。

另外查询逻辑下沉到数据库具体指的是什么?能否详细说明一下。

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#lookup-cache

WuKong  于2021年11月11日周四 下午5:42写道:

> Hi :
>现在有个场景, 我有一张Kafka的表,需要基于这张Kafka的流表 进行事件触发,去关联DB的多表 来拉宽数据 。比如: select *
> from kafkaTableA AS A
>join DBTableB FOR SYSTEM_TIME AS OF A.`PROCTIME` AS B ON valueB =
> B.columnB
>join DBTableC FOR SYSTEM_TIME AS OF A.`PROCTIME` AS C  ON valueC =
> C.columnC 。
>   目前有两个问题:
>1.  我看数据库里 是单表去查询数据的数据的,
>  select * from DBTableB where B.columnB   = valueB
>  select * from DBTableC where C.columnC   = valueC
>   这里我可以配置 把整个查询逻辑 下沉到数据库去做吗?
>   2. 我想把Kafka 里的数据 积累一点之后 通过微批的形式 IN 查询 请问 有没有可能这么做?
>
>
>
> ---
> Best,
> WuKong
>


Re: flinksql sink写csv小文件问题

2021-11-11 文章 Caizhi Weng
Hi!

filesystem sink 的文件数量与 sink 并发数有关。如果数据量不大可以考虑在 sink DDL 的 with 参数里加入
'sink.parallelism' = '1' 设置 sink 并发度为 1。

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月11日周四 下午4:50写道:

> 问题描述:写到本地产生了8个part-817677cc-2f4b-464a-bf9e-11957bcf9c76-0-0 ...的文件
> 请问,我只想写成一个csv文件,如果关闭这种文件分区
>
>
>
> Flink SQL:
> String tw_smart_tag="CREATE TABLE tw_smart_tag (\n" +
> "  id STRING,\n" +
> "  tag_code STRING,\n"+
> "  parent_id STRING,\n"+
> "  name STRING,\n"+
> "  type STRING,\n"+
> "  tag_type STRING,\n"+
> "  data_type STRING,\n"+
> "  status STRING,\n"+
> "  valid_status STRING,\n"+
> "  opr_status STRING,\n"+
> "  online STRING,\n"+
> "  opr_type STRING,\n"+
> "  opr_time STRING,\n"+
> "  invalid_time STRING,\n"+
> "  auth_type STRING,\n"+
> "  remark STRING,\n"+
> "  sort STRING,\n"+
> "  batch_no STRING,\n"+
> "  created_by STRING,\n"+
> "  created_time STRING,\n"+
> "  updated_by STRING,\n"+
> "  updated_time STRING\n"+
> ") WITH (\n" +
> "  'connector' = 'filesystem',   -- 必选: 指定连接器类型\n" +
> "  'path' =
> 'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag.csv',
> -- 必选: 指向目录的路径\n" +
> "  'format' = 'csv'   -- 必选: 文件系统连接器需要指定格式,请查阅 表格式
> 部分以获取更多细节\n" +
> ")\n";
>
> String tw_smart_tag_detail="CREATE TABLE tw_smart_tag_detail (\n" +
> "  id STRING,\n" +
> "  tag_id STRING,\n"+
> "  code STRING,\n"+
> "  name STRING,\n"+
> "  content STRING,\n"+
> "  status STRING,\n"+
> "  created_by STRING,\n"+
> "  created_time STRING,\n"+
> "  updated_by STRING,\n"+
> "  updated_time STRING\n"+
> ") WITH (\n" +
> "  'connector' = 'filesystem',   -- 必选: 指定连接器类型\n" +
> "  'path' =
> 'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag_detail.csv',
> -- 必选: 指向目录的路径\n" +
> "  'format' = 'csv'   -- 必选: 文件系统连接器需要指定格式,请查阅 表格式
> 部分以获取更多细节\n" +
> ")\n";
>
> //导出到本地
> String loaclhostFile="CREATE TABLE loaclhost_File (\n" +
> "  id STRING,\n" +
> "  tag_code STRING,\n"+
> "  name STRING,\n"+
> "  data_type STRING,\n"+
> "  detailID STRING,\n"+
> "  tag_id STRING,\n"+
> "  detailName STRING\n"+
> ") WITH (\n" +
> "  'connector' = 'filesystem',   -- 必选: 指定连接器类型\n" +
> "  'path' = 'hdfs://ark1:8020//tmp/usertag/20211029/data/',  --
> 必选: 指向目录的路径\n" +
> "  'format' = 'csv',   -- 必选: 文件系统连接器需要指定格式,请查阅
> 表格式 部分以获取更多细节\n" +
> ")\n";
>
> String joinSQL = "insert into loaclhost_File\n" +
> "SELECT tw_smart_tag.id AS id,\n" +
> "tw_smart_tag.tag_code AS tag_code,\n" +
> "tw_smart_tag.name AS name,\n" +
> "tw_smart_tag.data_type AS data_type,\n" +
> "tw_smart_tag_detail.id AS detailID,\n" +
> "tw_smart_tag_detail.tag_id AS tag_id,\n" +
> "tw_smart_tag_detail.name AS detailName\n" +
> "FROM tw_smart_tag INNER JOIN tw_smart_tag_detail ON
> tw_smart_tag.id = tw_smart_tag_detail.tag_id";
> // tw_smart_tag.id = tw_smart_tag_detail.tag_id
> tenv.executeSql(tw_smart_tag).print();
> tenv.executeSql(tw_smart_tag_detail).print();
> tenv.executeSql(loaclhostFile).print();
> tenv.executeSql(joinSQL).print();
>
> 陈卓宇
>
>
>  


Re: Flinksql 多表进行full join 出现异常

2021-11-11 文章 Caizhi Weng
Hi!

感谢反馈问题。这看起来其实和 join 无关,应该是与 source 有关。方便的话,能否把 source 表的
ddl(包含每个字段的类型,字段名如果敏感可以重命名一下)和其他信息(例如 source 表以什么格式存储)分享在邮件里?

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月11日周四 下午9:44写道:

> 场景:进行多表的full join失败
>
>
> 报错:
> java.lang.RuntimeException: Failed to fetch next result
>
>     at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
>     at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
>     at
> org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
>     at
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
>     at
> org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
>     at
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
>     at TableAPI.envReadFileSysteam(TableAPI.java:441)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>     at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>     at
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>     at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>     at
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>     at
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>     at
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>     at
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>     at
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
>     at
> com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:64)
> Caused by: java.io.IOException: Failed to fetch job execution result
>     at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169)
>     at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
>     at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>     ... 39 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>     at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>     at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167)
>     ... 41 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>     at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>     at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
>     at
> java.

Re: flink1.12.4 写入hdfs报错 java.lang.OutOfMemoryError: Direct buffer memory

2021-11-08 文章 Caizhi Weng
Hi!

可以通过配置 taskmanager.memory.task.off-heap.size 指定 direct memory 和 native
memory 的大小,详见 [1]。

[1]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/memory/mem_setup_tm/#%e9%85%8d%e7%bd%ae%e5%a0%86%e5%a4%96%e5%86%85%e5%ad%98%e7%9b%b4%e6%8e%a5%e5%86%85%e5%ad%98%e6%88%96%e6%9c%ac%e5%9c%b0%e5%86%85%e5%ad%98

xiao cai  于2021年11月8日周一 下午10:20写道:

> 通过flink 1.12.4 streaming file sink 写入hdfs,运行过程中抛出以下异常:
>
>
> 2021-11-08 20:39:05
> java.io.IOException: java.lang.OutOfMemoryError: Direct buffer memory
> at
> org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.set(DataStreamer.java:299)
> at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:820)
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:694)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
> at sun.nio.ch.IOUtil.write(IOUtil.java:58)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
> at org.apache.hadoop.net
> .SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
> at org.apache.hadoop.net
> .SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
> at org.apache.hadoop.net
> .SocketOutputStream.write(SocketOutputStream.java:159)
> at org.apache.hadoop.net
> .SocketOutputStream.write(SocketOutputStream.java:117)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at org.apache.hadoop.hdfs.DFSPacket.writeTo(DFSPacket.java:180)
> at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:765)


Re: flink的一个场景问题

2021-11-08 文章 Caizhi Weng
Hi!

一般将结果写到外部系统是通过 sink 节点。如果 Flink 没有内置你需要的 connector,可以考虑继承并实现
SinkFunction(很基本的 sink)或 RichSinkFunction(带 checkpoint 等功能)等自定义 sink,然后通过
DataStream#addSink 方法把这个 sink 加在 datastream 的末尾。

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月8日周一 下午2:40写道:

> 场景:我对一批kafa中的数据使用flink进行消费,然后通过process算子进行处理加工,将其写更新到三方的数据存储介质中
> 问题:在上述的过程中并没有sink的阶段,直接在process中处理完成后写入到存储介质里了,sink没办法只能写一个print()
> ,这种场景有更优的解决方案么?
>
>
>
>
>
> 陈卓宇
>
>
>  


Re: 窗口时间不准

2021-11-07 文章 Caizhi Weng
Hi!

如果某个窗口里没有任何数据,那么这个窗口就不会产生。这个输出应该说明数据里没有 11:12:36 ~ 11:12:37 的内容。

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月8日周一 上午11:24写道:

> 场景是:
> 首先使用assignTimestampsAndWatermarks定义了eventTime的时间语义,然后我调用window(TumblingEventTimeWindows.of(Time.seconds(1))).process(...)
> 开一个1秒钟的窗口进行逻辑的计算
> 在这个窗口内进行一个print控制台打印,打印的内容中通过context.window().getEnd()方法拿到窗口结束时间,发现有一部分
> 数据是大于1s的
> 数据:
> ==> ResultBean(key=ZYSZ01, count=2,
> timestamp=1636341155000, datetime=2021-11-08 11:12:35,
> indicator=elevator.analysis.sum.floor, userablerate=null,
> sumofflinetime=null, forusetime=null, producttime=null)
> ==> ResultBean(key=ZYSZ01, count=3,
> timestamp=1636341158000, datetime=2021-11-08 11:12:38,
> indicator=elevator.analysis.sum.floor, userablerate=null,
> sumofflinetime=null, forusetime=null, producttime=null)
> ==> ResultBean(key=ZYSZ01, count=5,
> timestamp=1636341159000, datetime=2021-11-08 11:12:39,
> indicator=elevator.analysis.sum.floor, userablerate=null,
> sumofflinetime=null, forusetime=null, producttime=null)
>
>
> 问题:
>   为什么名名我设置了1s的窗口,还会会出现时间大于1s的情况?
>
> 陈卓宇
>
>
>  


Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。

2021-11-04 文章 Caizhi Weng
Hi!

1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。

正确

2 除此以外,多个检查点之间的文件是没有办法合并的对吧。

正确

实际部分节点做的是后台IO了事情,是不是反映不到busy情况上

是的,busy 的计算方式是通过采样看有多少个线程正在工作。对于 sink 这种线程都在等待后台 io 的节点来说确实 busy 值不会很高。

yidan zhao  于2021年11月4日周四 下午5:57写道:

> hi,还想继续问下。这个合并机制,根据文档介绍如下。
> Whether to enable automatic compaction in streaming sink or not. The data
> will be written to temporary files. After the checkpoint is completed, the
> temporary files generated by a checkpoint will be compacted. The temporary
> files are invisible before compaction.
> 看文档,是指每次检查点完成后,会将单个检查点产生的文件进行合并。也就是说只有单个检查点产生的文件会被合并。
> 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
>
> 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
>
> 3 另外一个问题:目前看flinksql写hive,streaming情况。从web
>
> ui上看不开启compact情况下,几乎每个节点都是蓝色,而且数据量不大。开启compact情况,几乎也都是蓝色,数据量也不大,但只有compact节点是持续红色。
>
> 按照我的理解写hive这种情况下,实际部分节点做的是后台IO了事情,是不是反映不到busy情况上,busy比如只考虑对接受元素的处理,至于这个元素导致这个算子有多少background的工作并反映不出来。对吗。
> 所以即使看起来都是蓝色的,也不能降低并行度,而是自行根据数据量采用一个差不多的并行度。
>


Re: Tumbling Windows 窗口可开的最小单位

2021-11-04 文章 Caizhi Weng
Hi!

没有限制,1ms 都可以,理论上小 tumble window 对性能影响不大。

可以具体说一下为什么需要小 tumble window 吗?小 tumble window 确实比较少见,也许用其他算子更合适。

李航飞  于2021年11月5日周五 下午12:32写道:

> 滚动窗口最小可开多大,100ms?
> 对性能有什么影响吗?


Re: flink1.12.1 读取kafka的数据写入到clickhouse如何支持upsert操作呢

2021-11-04 文章 Caizhi Weng
Hi!

你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData
是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。

另:我记得我已经回复了两封相同的邮件,之前的回复是丢失了吗?

扯  于2021年11月5日周五 上午10:50写道:

>
> 您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下:
>
> clickhouse建表语句如下:
> CREATE TABLE test_local.tzling_tb3(
> uuid String,
> product String,
> platform String,
> batchId String,
> id String,
> account String,
> customerId String,
> reportName String,
> dt String,
> campaign String,
> adGroup String,
> generalField String,
> currency String,
> impressions String,
> cost String,
> clicks String,
> conversions String,
> createDateTime String,
> createTime BIGINT,
> key String,
> pdate String
> )engine = MergeTree PARTITION BY pdate order by createTime;
> 将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。
>
> processData.addSink(new MSKUpsertClickHouseSink());
> 附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为:
> 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。
>
> 请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢?
>


Re: flink 1.14 Hybrid Source切换source时机问题

2021-11-04 文章 Caizhi Weng
Hi!

目前 Flink 虽然已经有相应接口[1],但还没有任何 source 实现这个功能。可以在
https://issues.apache.org/jira/browse/FLINK-23633 里追踪这个问题的进展。

当然,如果你的 hive 表是以天为 partition 的,可以设置固定的切换时间点,然后 hive 只读之前的 partition。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#dynamic-start-position-at-switch-time

casel.chen  于2021年11月5日周五 上午8:39写道:

> 我有一个Hybrid Source切换时机问题:
> 在Hive+Kafka场景下,假如Kafka保留数据时长(retension)是1天,为了实现无缝衔接,我需要从Hive消费历史存量数据直到距离当前时间小于1天时才切换到kafka
> source,假设Hive中有字段表示处理时间的话,请问目前Flink Hybrid Source支持这种用法吗?如果支持的话程序应该要怎么写?谢谢!


Re: flink启动yarn-session失败

2021-11-04 文章 Caizhi Weng
Hi!

没有在邮件里发现附件,可以考虑把 flink-conf.yaml 的内容贴在邮件里,或者外部剪贴板。

casel.chen  于2021年11月4日周四 下午6:42写道:

> flink 1.13.2 + hadoop 3.2.1
> yarn上已经成功跑了hive和spark作业
> flink上通过运行 bin/yarn-session.sh 启动yarn session集群的时候一直报如下INFO日志,查看yarn web
> console发现并没有启flink-session集群,我的flink-conf.yaml配置如附件,hadoop集群并没有开启认证SSL之类的,改用standalone模式是可以启动3节点集群的,请问这会是什么原因造成的?要怎么修复?谢谢!
>
>
>
> 2021-11-04 16:51:39,964 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:39,986 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,004 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,020 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,041 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,059 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,078 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,097 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,114 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,134 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,155 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,175 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,193 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,212 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
>
>
>
>
>


Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。

2021-11-02 文章 Caizhi Weng
Hi!

hive sink 有文件合并功能可以在同一个 checkpoint 内把同一个 partition 的数据整理到同一个文件里。详见 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#file-compaction

yidan zhao  于2021年11月3日周三 上午10:03写道:

> 需求
> 假设,我的hive表为tmp表,若干字段,如何以dt、hour、sid为分区,其中sid为渠道的含义。
>
> 我当前基于FlinkSQL从kafka表中读取数据,转写到hive表tmp中,采用流式写入,提交策略metastore、success-file,触发假设用process-time,delay为1h。
> 检查点每1min检查一次,连续2次检查点间隔10min,本质就是10min做一次检查点。
>
> 当前情况
> 由于数据量较大,kafka分区数量为60,因此我的任务并发可以选择60以内,假设并发也选了60。
> 那么对于每个时间点,dt肯定只有1个,hour也基本只有1个,sid的话假设有10个。
> 文件数情况为:
> 每10分钟,10(sid)*60(parallelism)= 600个。
> 每小时有6个10分钟(即6次检查点),那么就是6000个文件。
> 如上,每小时差不多6000个文件生成,只会多不会少,因为考虑到roll policy等。
>
>
> 目前我需要的是,由于不同sid的数据量不一样,我想能否对于小数据量的sid,只被1个subtask消费,这样对于这个sid对应的分区下,每10分钟的文件数量就是1个,而不是60个。
> 对于数据量大的sid,则多个并行subtask消费。
> 大概想法类似于datastream api中先keyBy
>
> sid(当然这里可能有数据倾斜,我可以自己去想法解决,比如将大流量sid分散为sid+randomInt),然后基于streamingSink来消费并写入hive。
>
> 请问如上想法datastream、以及 flinkSQL 是否都能实现呢?
>
> 目前我看insert into tmp select ... from
> kafka_tmp;这样的话,默认生成2个task,一个kafkaSouce+streamSink(chain在一起)+ partition
> commiter,这是不满足需要的肯定。
>


Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题

2021-11-01 文章 Caizhi Weng
Hi!

hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取
hive 里表的结构等信息。但 kafka 的表定义仍然要写。

你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。

yidan zhao  于2021年11月1日周一 下午3:05写道:

> 如题,我看了官方文档,定义好kafka和hive表。
> 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。
>
> SET table.sql-dialect=hive;CREATE TABLE hive_table (
>   user_id STRING,
>   order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS
> parquet TBLPROPERTIES (
>   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.delay'='1 h',
>   'sink.partition-commit.policy.kind'='metastore,success-file');
> SET table.sql-dialect=default;CREATE TABLE kafka_table (
>   user_id STRING,
>   order_amount DOUBLE,
>   log_ts TIMESTAMP(3),
>   WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define
> watermark on TIMESTAMP column) WITH (...);
>
>
> 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。
>
> 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。
> 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。
>
>
> 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。
>


Re: Re: 公司数据密文,实现group by和join

2021-10-28 文章 Caizhi Weng
Hi!

你是不是想写这样的 SQL:

SELECT id, sum(price) AS total_price FROM (
  SELECT T1.id AS id, T2.price AS price FROM T AS T1 INNER JOIN T AS T2 ON
decrypt_udf(T1.id, T2.id) = 1
) GROUP BY id

这个 sql 会输出每个 id 和该 id 属于的分组的总价格。

lyh1067341434  于2021年10月29日周五 上午9:41写道:

> 您好!
>
>
>   感谢您在百忙之中抽空回复我的邮件,我已经按照您的建议,自定义join函数实现了密文的join,但密文的group by 还是实现不了;
>
>
> 比如 有一张表 a, 表a有
> id,price列,数据都是密文,类似这样("MBwEELdR0JDC0OSryuQskeulP8YCCAyJLH7RwmAA");
>
>
> 如果我想求 不同id组的price之和:
> 直接使用flink 计算:会把id的分组当成字符串处理,从而导致分组的不正确;
> 如果调用密文计算的接口的话,把两个比较的key的密文传进入,会得到1或者0,来判断这两个密文key是否相等,从而分组可以正确;
>
>
>
>
> 问题:
>
>
> 目前group by分组,不知道在哪里实现调用密文计算的接口,从而传入两个key,来进行分组正确;
>我看到api只能指定分组的key是哪一个;
>
>
> 谢谢您!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-10-28 11:09:26,"Caizhi Weng"  写道:
> >Hi!
> >
> >没太明白你的需求。你的需求是不是
> >
> >1. 调用一个自定义函数,用某一列密文算出一个值 k,用这个 k 作为 join key 或者 group key。
> >
> >如果是这个需求,只要实现一个 udf 即可。详见 [1]。
> >
> >2. 调用一个自定义函数,用某两列密文算出一个 true 或 false,如果是 true 说明 join key 匹配。
> >
> >如果是这个需求,仍然只需要实现一个 udf。join 条件中调用这个 udf 即可。但如果是这个需求,不太明白你期望中的 group by
> >是什么样的,因为不能仅通过 true false 就判断哪些数据属于同一个 group。
> >
> >[1]
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/udfs/
> >
> >lyh1067341...@163.com  于2021年10月27日周三 下午5:20写道:
> >
> >> 您好:
> >>
> 目前公司数据都是密文,要进行密文数据的比较或者计算的话,只能调用公司密文计算的接口,去看了下flink的分组和join算子,都只能指定分组的key或者join的key,不知道怎么改写比较的规则,我用mapreduce实现了重写shuffle的比较规则,可以实现密文下的join和group
> >> by,对于使用spark和flink算子不知道如何实现。
> >>
> >> 问题:
> >> 请问有啥办法,实现密文下的join和group by操作吗?(在不能解密,只能调用公司密文计算的接口)
> >>
> >> 谢谢您。
> >>
> >>
> >>
> >> 发自 网易邮箱大师
>


Re: flink sql消费kafka各分区消息不均衡问题

2021-10-28 文章 Caizhi Weng
Hi!

如果没有具体的 SQL 很难分析这个问题。可以通过 Flink UI
观察各节点各并发的处理数据量看是否有部分并发处理量比较大;另外可以观察每个节点的反压情况,看是否有部分并发反压严重。另外可以特别注意 hash 边,看
hash key 是否有倾斜(这会表现在下游节点不同并发之间处理量差异比较大)。

casel.chen  于2021年10月29日周五 上午9:31写道:

> flink
> sql消费kafka消息做数据同步,前期没有出现堆积不均的问题,这两天发现某些kafka分区积压特别多,会是什么原因造成的?怎样解决呢?从统计结果上看,消息还算均匀地打到各个kafka分区上。作业没有开窗和聚合,只是攒一批写一批这样子的。注:作业是跑在k8s上的
>
>
> | 分区 ID | 客户端 | 最大位点 | 消费位点 | 堆积量 |
> | 0 | n/a | 155,397,108 | 155,396,747 | 361 |
> | 1 | n/a | 155,215,444 | 155,215,108 | 336 |
> | 2 | n/a | 155,369,596 | 155,369,258 | 338 |
> | 3 | n/a | 155,422,750 | 155,422,337 | 413 |
> | 4 | n/a | 155,163,343 | 154,489,738 | 673,605 |
> | 5 | n/a | 155,401,388 | 154,702,173 | 699,215 |
> | 6 | n/a | 155,372,040 | 154,651,398 | 720,642 |
> | 7 | n/a | 155,208,461 | 154,528,301 | 680,160 |
> | 8 | n/a | 155,383,486 | 154,696,404 | 687,082 |
> | 9 | n/a | 155,391,068 | 154,668,426 | 722,642 |
> | 10 | n/a | 155,139,417 | 154,450,377 | 689,040 |
> | 11 | n/a | 155,411,848 | 155,411,518 | 330 |
>
>


Re: 公司数据密文,实现group by和join

2021-10-27 文章 Caizhi Weng
Hi!

没太明白你的需求。你的需求是不是

1. 调用一个自定义函数,用某一列密文算出一个值 k,用这个 k 作为 join key 或者 group key。

如果是这个需求,只要实现一个 udf 即可。详见 [1]。

2. 调用一个自定义函数,用某两列密文算出一个 true 或 false,如果是 true 说明 join key 匹配。

如果是这个需求,仍然只需要实现一个 udf。join 条件中调用这个 udf 即可。但如果是这个需求,不太明白你期望中的 group by
是什么样的,因为不能仅通过 true false 就判断哪些数据属于同一个 group。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/udfs/

lyh1067341...@163.com  于2021年10月27日周三 下午5:20写道:

> 您好:
> 目前公司数据都是密文,要进行密文数据的比较或者计算的话,只能调用公司密文计算的接口,去看了下flink的分组和join算子,都只能指定分组的key或者join的key,不知道怎么改写比较的规则,我用mapreduce实现了重写shuffle的比较规则,可以实现密文下的join和group
> by,对于使用spark和flink算子不知道如何实现。
>
> 问题:
> 请问有啥办法,实现密文下的join和group by操作吗?(在不能解密,只能调用公司密文计算的接口)
>
> 谢谢您。
>
>
>
> 发自 网易邮箱大师


Re: flinksql写hive的时候,数据类型不匹配。

2021-10-27 文章 Caizhi Weng
Hi!

是字段的顺序不一致(sink schema 里 d 在第三个,query 里 d 在最后)。需要将这两个顺序调整到一致。

yidan zhao  于2021年10月27日周三 下午4:45写道:

> Flink SQL> insert into upd_sh.dr1 select cast('cid' as string) as cid,
> cast(2001 as bigint) as server_time, cast('20211027' as string) as dt,
> cast('01' as str
> ing) as `hour`, cast('7102' as string) as supply_id, cast(map['sid','7102',
> 'subid', 'i am a subid'] as map) as d;
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Column types of query
> result and sink for registered table 'hive.upd_sh.dr1' do not match.
> Cause: Incompatible types for sink column 'd' at position 2.
>
> Query schema: [cid: STRING NOT NULL, server_time: BIGINT NOT NULL, dt:
> STRING NOT NULL, hour: STRING NOT NULL, supply_id: STRING NOT NULL, d:
> MAP NOT NULL]
> Sink schema:  [cid: STRING, server_time: BIGINT, d: MAP,
> dt: STRING, hour: STRING, supply_id: STRING]
>
>
> 如上,显示是d这个字段有问题,但实际上d我已经强制转换了,类型完全一致。
>


Re: Flink1.12.1 读取kafka的数据写入到clickhouse如何支持upsert操作呢

2021-10-27 文章 Caizhi Weng
Hi!

你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData
是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。

涂子令  于2021年10月27日周三 下午5:49写道:

>  您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用
> flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下:
>
>
>
> clickhouse建表语句如下:
>
> CREATE TABLE test_local.tzling_tb3(
>
> uuid String,
>
> product String,
>
> platform String,
>
>   batchId String,
>
>   id String,
>
>   account String,
>
>   customerId String,
>
>   reportName String,
>
>   dt String,
>
>   campaign String,
>
>   adGroup String,
>
>   generalField String,
>
>   currency String,
>
>   impressions String,
>
>   cost String,
>
>   clicks String,
>
>   conversions String,
>
>   createDateTime String,
>
>   createTime BIGINT,
>
>   key String,
>
>   pdate String
>
> )engine = MergeTree PARTITION BY pdate order by createTime;
>
> 将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。
>
>
>
> processData.addSink(new MSKUpsertClickHouseSink());
>
> 附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为:
> 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append
> 操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。
>
>
>
> 请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢?
>
>
>
>
>
> 从 Windows 版邮件 发送
>
>
>


Re: flink写mysql问题

2021-10-26 文章 Caizhi Weng
Hi!

Flink 1.11 对 jdbc 在流作业中的支持确实不完善,在流作业做 checkpoint 时没有处理。如果需要在流作业中使用 jdbc
sink,建议升级到比较新的 1.13 或 1.14。

zya  于2021年10月26日周二 下午4:56写道:

> 你好,感谢回复
> 在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢?
>
>
> 我用的是1.11.2版本的flink
> sql,我发现数据写到外部直接使用的是BufferReduceStatementExecutor中的方法,同时在做检查点的时候不会触发到数据库的flush,好像没有使用到类GenericJdbcSinkFunction
> 那么如果遇到断电等问题,这部分数据是不是会丢失呢
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> tsreape...@gmail.com>;
> 发送时间: 2021年10月26日(星期二) 上午10:31
> 收件人: "flink中文邮件组"
> 主题: Re: flink写mysql问题
>
>
>
> Hi!
>
> 在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢
>
>
> JdbcDynamicTableSink 不包含具体 sink function
> 的实现,具体的实现位于 GenericJdbcSinkFunction。该类的 snapshotState 即为 snapshot 的实现。不同的
> jdbc 数据库以及不同的 sql 之间攒 batch 的行为略有不同,具体见 JdbcBatchStatementExecutor 及其子类。
>
> 写 mysql 的 qps 只能到几百,反压严重
>
>
> jdbc connector 有一些 with 参数用来控制 flush 的时间。例如 sink.buffer-flush.interval
> 就会控制攒了多少数据就 flush。它的默认值是 100,因此对于流量比较大的作业需要相应调大。其他相关参数见 [1]。
>
> 算子使用 sum 计算,高峰时候 1000条/s,但是做检查点需要好几分钟才能完成
>
>
> checkpoint 阻塞有大量原因。从邮件中的描述来看最有可能是因为 sink 反压导致上游 checkpoint
> 也被反压。排除该原因后还可以观察 checkpoint 大小是否过大,以及相应节点 gc 时间是否过长。这个要结合具体的 sql 分析。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows
>
> a <806040...@qq.com.invalid> 于2021年10月26日周二 上午9:49写道:
>
> >
> >
> 各位好,我在使用flink写mysql的时候,发现sink是使用了JdbcDynamicTableSink这个类,但是这个类没有实现checkpoint相关的接口,我想请问一下,1.在任务做检查点的时候,内存中缓存的一批数据如何flush到mysql中的呢
> >
> >
> 2.我的任务写mysql的qps只能到几百,反压严重,算子使用sum计算,高峰时候1000条/s,但是做检查点需要好几分钟才能完成,请问这里有什么排查方法吗


Re: Flink任务每运行20天均会发生内部异常

2021-10-26 文章 Caizhi Weng
Hi!

听起来和 state 过期时间非常有关。你配置了哪些和 state 过期相关的参数?是否有 20 天过期的 state?

mayifan  于2021年10月26日周二 下午4:43写道:

> Hi!
>
> 麻烦请教大家一个问题。
>
>
> 有三个Flink任务以yarn-per-job模式运行在Flink-1.11.2版本的集群上,均使用RocksDB作为状态后端,数据以增量的方式写入RocksDB,且均配置了状态过期时间。
>
>
> 任务逻辑大致都是通过状态与历史数据进行自关联或双流join,每输入一条数据都会产出等量、1/2或多倍的数据到下游,当数据无法通过状态关联,任务则无法向下游产出数据。
>
>
> 奇怪的是三个任务中有两个任务存在异常,异常现象是每次当任务启动运行至第20个工作日,都会非常准时的产生下游数据输出骤降的现象,输出与输入的数据量级差数十倍,并且此时任务中没有任何异常日志。
>
>
>
>
> 问题:目前怀疑是集群配置或RocksDB状态的问题,但是没有任何思路或排查线索,请问这种现象是怎样产生的?应该怎样排查?


Re: flink keyby之后数据倾斜的问题

2021-10-26 文章 Caizhi Weng
Hi!

Flink SQL 里已经内置了很多解倾斜的方式,例如 local global 聚合。详见 [1],如果一定要使用 streaming api
可以参考该思路进行优化。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/table/tuning/#local-global-%e8%81%9a%e5%90%88

xiazhl  于2021年10月26日周二 下午2:31写道:

> hello everyone!               
>       向大家求助一个使用keyby后导致数据倾斜的问题。      
>
>
>       背景:使用flink streamAPI进行数据处理和提取,结果写入物理存储。
> 处理后会将数据量放大10倍左右。
>              
> 考虑到其中有大量重复数据,使用flink状态根据id进行精确去重。去重前使用keyby id对数据进行分区。
>
>
>       问题:目前keyby之后会产生数据倾斜,切斜比例  高:低≈3:1,
> 各位大佬有什么好的方案处理这个问题吗?


Re: Flink SQL 1.12 批量数据导入,如果加速性能

2021-10-25 文章 Caizhi Weng
Hi!

我通过 Flink SQL 无论怎么加大并行度, 都是单并行度导入


你是如何加大并行度的?除 source 外其他节点也是单并行度吗,还是说可以成功加大?能否分享一下你的 SQL 便于解答?

WuKong  于2021年10月26日周二 上午11:36写道:

> Hi:
> 我目前有个场景, 需要通过基于Flink SQL 进行历史数据导入, 比如Source 端是一张MYSQL 表, Sink端
> 也是一张MSYQL 表, 我通过Flink SQL 无论怎么加大并行度, 都是单并行度导入,速率很慢, 请问有什么需要配置的吗? 或者其他解决方案
> 可以基于SQL进行大批量数据导入,数十亿量级。
>
>
>
> ---
> Best,
> WuKong
>


Re: flink1.12.1 读取kafka的数据写入到clickhouse如何支持upsert操作呢

2021-10-25 文章 Caizhi Weng
Hi!

你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData
是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。

扯  于2021年10月26日周二 上午9:49写道:

>
> 您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下:
>
> clickhouse建表语句如下:
> CREATE TABLE test_local.tzling_tb3(
> uuid String,
> product String,
> platform String,
> batchId String,
> id String,
> account String,
> customerId String,
> reportName String,
> dt String,
> campaign String,
> adGroup String,
> generalField String,
> currency String,
> impressions String,
> cost String,
> clicks String,
> conversions String,
> createDateTime String,
> createTime BIGINT,
> key String,
> pdate String
> )engine = MergeTree PARTITION BY pdate order by createTime;
> 将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。
>
> processData.addSink(new MSKUpsertClickHouseSink());
> 附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为:
> 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。
>
> 请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢?
>


Re: flink写mysql问题

2021-10-25 文章 Caizhi Weng
Hi!

在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢


JdbcDynamicTableSink 不包含具体 sink function
的实现,具体的实现位于 GenericJdbcSinkFunction。该类的 snapshotState 即为 snapshot 的实现。不同的
jdbc 数据库以及不同的 sql 之间攒 batch 的行为略有不同,具体见 JdbcBatchStatementExecutor 及其子类。

写 mysql 的 qps 只能到几百,反压严重


jdbc connector 有一些 with 参数用来控制 flush 的时间。例如 sink.buffer-flush.interval
就会控制攒了多少数据就 flush。它的默认值是 100,因此对于流量比较大的作业需要相应调大。其他相关参数见 [1]。

算子使用 sum 计算,高峰时候 1000条/s,但是做检查点需要好几分钟才能完成


checkpoint 阻塞有大量原因。从邮件中的描述来看最有可能是因为 sink 反压导致上游 checkpoint
也被反压。排除该原因后还可以观察 checkpoint 大小是否过大,以及相应节点 gc 时间是否过长。这个要结合具体的 sql 分析。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows

a <806040...@qq.com.invalid> 于2021年10月26日周二 上午9:49写道:

>
> 各位好,我在使用flink写mysql的时候,发现sink是使用了JdbcDynamicTableSink这个类,但是这个类没有实现checkpoint相关的接口,我想请问一下,1.在任务做检查点的时候,内存中缓存的一批数据如何flush到mysql中的呢
>
> 2.我的任务写mysql的qps只能到几百,反压严重,算子使用sum计算,高峰时候1000条/s,但是做检查点需要好几分钟才能完成,请问这里有什么排查方法吗


Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-10-24 文章 Caizhi Weng
Hi!

这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过
metrics 读取,见 [1]。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors

杨浩  于2021年10月25日周一 上午10:20写道:

> 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度


Re: Flink Over Window功能存储状态优化能有性能提升吗?

2021-10-19 文章 Caizhi Weng
Hi!

会将 input 的记录存储在 state 里面。

如果 input 的字段比较多,但是参与聚合运算的字段比较少。

这样会导致 state 非常的大。
>

你使用的是哪个 Flink 版本以及哪个 planner?这个观察是如何得出的呢?就我所知,state 里应该只存储了参与 agg 运算的字段。

Tianwang Li  于2021年10月19日周二 下午8:34写道:

> Flink 的 Over 窗口
> 例如在 range over window 场合,会将 input 的记录存储在 state 里面。
> 如果 input 的字段比较多,但是参与聚合运算的字段比较少。
> 这样会导致 state 非常的大。
>
> 从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看,
> 不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。
>
> 这样能提升Over 窗口的处理性能吗?
>
> SQL例子:
>
> SELECT
> col_1,
> col_2,
> col_3,
> col_4,
> col_5,
> col_6, -- 字段内容比较长
> col_7, -- 字段内容比较长
> col_8, -- 字段内容比较长
> col_9, -- 字段内容比较长
> col_10,
> col_11,
> col_12,
> col_13,
> col_14,
> col_15,
> col_16,
> col_17,
> col_18,
> col_19,
> sum(col_10) OVER w AS day_col_10,
> sum(col_11) OVER w AS day_col_11,
> sum(col_12) OVER w AS day_col_12,
> sum(col_13) OVER w AS day_col_13,
> sum(col_14) OVER w AS day_col_14,
> sum(col_15) OVER w AS day_col_15,
> sum(col_16) OVER w AS day_col_16
> FROM table_3
> window w as (
> PARTITION BY col_1, col_2
> ORDER BY rowtime
> RANGE BETWEEN INTERVAL '1' DAY preceding AND CURRENT ROW)
>
> --
> **
>  tivanli
> **
>


Re: Re: Flink-1.12 sql 同一个Job 下 如何控制 多个SQL的执行顺序

2021-10-14 文章 Caizhi Weng
Hi!

我同意你的观点,目前自定义 sink 应该是最合适的方法。

WuKong  于2021年10月14日周四 下午8:02写道:

> Hi Caizhi:
>
> 这个应该是和我们业务处理模式有关,我们需要同一个Kafka 流 输出两个Sink 同时要保持顺序性, 这个对我们来说是个通用的功能, 
> 我在想如何实现这种方式, 请问有什么方案值得推荐吗?
>我想通过实现一种自定义Table  比如 在一个Jdbc
> Table表的属性里 填充Kafka 属性,保持相同的Schema, 做为一个Sink 这样 方便做统一管理, 然后实现自定义Table 逻辑, 
> 这种方案如何?
>
> --
> ---
> Best,
> WuKong
>
>
> *发件人:* Caizhi Weng 
> *发送时间:* 2021-10-14 17:50
> *收件人:* user-zh 
> *主题:* Re: Flink-1.12 sql 同一个Job 下 如何控制 多个SQL的执行顺序
> Hi!
>
> “先插入 db,然后再写下游 kafka”,是说想要保证同一条数据必须写进 db 之后,才能写入 kafka 吗?
>
> 目前暂时没有这样的功能,如果确实很需要,可以考虑写一个 udf,udf 读到数据以后先写入 db
> 再传给下游。可以描述一下为什么有这种需求,大概是什么样的使用场景吗?
>
> WuKong  于2021年10月14日周四 下午5:11写道:
>
> > Hi:
> >目前遇到一个问题,我想在一个Job下 ,有两个SQL 分步都是 读取同一个Source Kafka 数据, 一个是插入Tidb 落数据,
> > 另一个是写入下游Kafka, 目前想控制 先插入DB 然后再写入下游Kafka, 请问有什么方案可以实现这种方式?
> >
> >
> >
> > ---
> > Best,
> > WuKong
> >
>
>


Re: Flink-1.12 sql 同一个Job 下 如何控制 多个SQL的执行顺序

2021-10-14 文章 Caizhi Weng
Hi!

“先插入 db,然后再写下游 kafka”,是说想要保证同一条数据必须写进 db 之后,才能写入 kafka 吗?

目前暂时没有这样的功能,如果确实很需要,可以考虑写一个 udf,udf 读到数据以后先写入 db
再传给下游。可以描述一下为什么有这种需求,大概是什么样的使用场景吗?

WuKong  于2021年10月14日周四 下午5:11写道:

> Hi:
>目前遇到一个问题,我想在一个Job下 ,有两个SQL 分步都是 读取同一个Source Kafka 数据, 一个是插入Tidb 落数据,
> 另一个是写入下游Kafka, 目前想控制 先插入DB 然后再写入下游Kafka, 请问有什么方案可以实现这种方式?
>
>
>
> ---
> Best,
> WuKong
>


Re: flink-1.14.0 sql 写array 错误

2021-10-13 文章 Caizhi Weng
Hi!

这看起来像一个 bug,我已经记了一个 issue [1],可以在那里关注问题进展。

如 issue 中所描述,目前看来如果常量字符串一样长,或者都 cast 成 varchar 可以绕过该问题。可以先这样绕过一下。

[1] https://issues.apache.org/jira/browse/FLINK-24537

kcz <573693...@qq.com.invalid> 于2021年10月13日周三 下午5:29写道:

> 因为select出多个sum的值,每一个sum的值都是一个type类型的数据,最后我将它插入到MySQL表里面,MySQL表结构为
> (id,type,value),于是想到通过列转行形式来操作。
> SQL如下:
> CREATE TABLE kafka_table (
>                    
>          vin STRING,
>                    
>          speed DOUBLE,
>                    
>          brake DOUBLE,
>                    
>          hard_to DOUBLE,
>                    
>          distance DOUBLE,
>                    
>          times TIMESTAMP(3),
>                    
>          WATERMARK FOR times AS times - INTERVAL
> '5' SECOND
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'json'
> );
>
>
>
>
> select window_start, window_end,vin,array[row('brakes',sum(if(brake >
> 3.0451,1,0))),row('hard_tos',sum(if(hard_to > 3.0451,1,0)))]
> from TABLE(
>     TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10'
> MINUTES)) group by window_start, window_end,vin;
>
>
> 报错如下:
> Exception in thread "main" java.lang.AssertionError: Conversion to
> relational algebra failed to preserve datatypes:
> validated type:
> RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL
> window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin,
> RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER
> EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
> converted type:
> RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL
> window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin,
> RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT
> NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
> rel:
> LogicalProject(window_start=[$0], window_end=[$1], vin=[$2],
> EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7)
> CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT
> NULL, CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET
> "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)])
>   LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)],
> agg#1=[SUM($4)])
>     LogicalProject(window_start=[$6], window_end=[$7], vin=[$0],
> $f3=[IF(>($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(>($3,
> 3.0451:DECIMAL(5, 4)), 1, 0)])
>       LogicalTableFunctionScan(invocation=[TUMBLE($5,
> DESCRIPTOR($5), 60:INTERVAL MINUTE)],
> rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake,
> DOUBLE hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3)
> window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
>         LogicalProject(vin=[$0], speed=[$1],
> brake=[$2], hard_to=[$3], distance=[$4], times=[$5])
>          
> LogicalWatermarkAssigner(rowtime=[times], watermark=[-($5, 5000:INTERVAL
> SECOND)])
>            
> LogicalTableScan(table=[[default_catalog, default_database, kafka_table]])
>
>
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48)
> at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87)
> Disconnected from the target VM, address: '127.0.0.1:61710', transport:
> 'socket'
>
>
> Process finished with exit code 1


Re: flinksql客户端不能提交任务

2021-10-13 文章 Caizhi Weng
Hi!

从报错信息来看 client 在尝试链接位于 http://127.0.0.1:8081/ 的集群。你的 yarn session
应该不在本地吧?所以很可能是 sql client 对应的 flink 配置出错,检查一下对应的 flink 配置文件看看。

maker_d...@foxmail.com  于2021年10月14日周四 上午9:18写道:

> 各位大家好:
> 紧急求助!
> 我之前一直用sql-client提交SQL任务,今天突然不能提交了,报错如下:
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:215)
> Caused by: java.lang.RuntimeException: Error running SQL job.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeUpdateInternal$4(LocalExecutor.java:514)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:256)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:507)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:428)
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:690)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:327)
> at java.util.Optional.ifPresent(Optional.java:159)
> at
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:214)
> at
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:144)
> at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:115)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeUpdateInternal$4(LocalExecutor.java:511)
> ... 10 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> at
> org.apache.fl

Re: 关于slot分配

2021-10-12 文章 Caizhi Weng
Hi!

“默认的优先单个 TM 的机制”我记得没有这样的参数。你的意思是把 cluster.evenly-spread-out-slots 设为 false
吗?如果是这样,那么会在所有 slot 中任意选择,而不是优先单个 TM。

想知道优先单个 TM 是出于什么样的需求呢?因为这样做可能会造成集群内部分机器很忙,但部分机器空闲的情况,忙机器上的并发会被拖慢。

yidan zhao  于2021年10月12日周二 下午4:25写道:

> 我以前采用分配到多个TM的机制,最近尝试了下默认的优先单个TM的机制。
> 但是发现个问题,我当前每个TM是10个slot,我有个任务40并发,然后实际占用了5个TM,10+10+10+2+8。这个是啥情况呢?
>
> 我更期望要么就彻底平均(配置spread那个参数),要么就单个TM这样用。
> 前者:期望机器之间均衡。
> 后者:期间任务之间完全隔离,我的任务并发会设置单TMslot数量(10)的倍数。
>


Re: ​异步IO算子无法完成checkpoint

2021-10-11 文章 Caizhi Weng
Hi!

图片无法在邮件中显示,请检查。

李一飞  于2021年10月12日周二 上午10:33写道:

> 异步IO算子无法完成checkpoint,帮忙看下是什么原因
>
>
>
>


Re: Does the flink sql support checkpoints

2021-10-11 文章 Caizhi Weng
(Forwarding this to the user mailing list as this mail is written in
English)

Hi!

I think problem 1 is the expected behavior. Is this behavior inconvenient
for you? If yes why it is the case?

For problem 2, could you explain in detail how do you run the word count
program and where do you store the counting result? It might be that you're
storing the results in a sink which only updates the result after a
successful checkpoint.

王小宅的蜗居生活  于2021年10月12日周二 上午9:32写道:

> The flink version is v1.13.2
>
> 王小宅的蜗居生活  于2021年10月11日周一 下午9:01写道:
>
> > Use flink sql for real-time calculation (deployment mode: on yarn). To
> use
> > the checkpoint, you need to configure the following in the
> flink-conf.yaml:
> >
> >
> > state.backend: filesystem
> >
> > state.checkpoints.dir: hdfs:///flink/flink-checkpoints
> >
> > state.savepoints.dir: hdfs:///flink/flink-savepoints
> >
> > state.checkpoints.num-retained: 10
> >
> >
> > There are two problems:
> >
> > 1.checkpoint generates a folder at the initial stage in the HDFS.
> >
> > 2. The savepoint cannot record the data status.
> >
> > For example: The simple wordcount cannot record the accumulated value.
> >
>


Re: flink sql计算新增用户

2021-09-28 文章 Caizhi Weng
Hi!

听起来 event time temporal join 符合你的需求。详见
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join

z  于2021年9月28日周二 下午8:27写道:

> hi各位,我想计算每日新增用户的数量,用户的登录日志在kafka中,在当日之前登录过的用户即为老用户,目前我的做法是将用户登录信息用flink
> sql写到下游mysql表A中,表中存储用户id和第一次登录的ts,然后再用另外一个流join这张表判断该用户是否在今天之前是否登录过,如果未登录则计算为当日新增玩家。
>
> 现在的问题是在0点时,可能由于数据延迟或者乱序,导致前天11:55的用户数据还没有写到A表中,但是0:01的数据到了,所以这个用户会被计算为新用户,或者另外一种情况时,写入表A的流消费比较慢,导致另外一个流join到的数据不全,老用户也会被判定为今日新增用户,请问这种情况我要如何计算到准确的每日实时新增用户呢?
> 考虑过使用10分钟窗口+5分钟延迟的形式,但是这样延迟就变成了15分钟,延迟时间过长


Re: flink-1.12.5 定义HIVDE DDL ,并且有comment,但是hive表里面没有体现出comment

2021-09-27 文章 Caizhi Weng
Hi!

这个问题已经在社区提出过了,可以在 https://issues.apache.org/jira/browse/FLINK-18958
这里追踪解决进度。

kcz <573693...@qq.com.invalid> 于2021年9月27日周一 上午11:21写道:

> hive版本3.1.0
> ddl如下:
> create table test_hive(
>  id int comment 'test comment'
> ) PARTITIONED BY (dt STRING) STORED AS orc TBLPROPERTIES (
>    'partition.time-extractor.kind'='custom',
>    'partition.time-extractor.timestamp-pattern'='$dt',
>  
>  'partition.time-extractor.class'='com.hycan.bigdata.utils.MyPartTimeExtractor',
>    'sink.partition-commit.trigger'='partition-time',
>    'sink.partition-commit.delay'='1 d',
>    'sink.partition-commit.policy.kind'='metastore,success-file'
> );
>
> 实际使用hive desc formatted test_hive 没有看到comment


Re: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate

2021-09-27 文章 Caizhi Weng
Hi!

你使用的是什么 Flink 版本呢?之前的 Flink 版本 window agg 只能消费 insert only 的数据,最新的 Flink
1.14 能够支持这样的询问。

lzy139...@outlook.com  于2021年9月27日周一 上午11:37写道:

> 使用ROW_NUMBER过滤数据后,进行开窗聚合计算报错


Re: flink能支持动态增加任务

2021-09-27 文章 Caizhi Weng
Hi!

如果你说的是对于固定的字段,每次需要过滤出来的值不一样,可以考虑维表 join。维表里保存的就是你需要过滤出来的值,这样每次只要更新维表即可。

如果你说的是每次要选择不同的字段,可能只能通过 udtf 来完成这个需求。udtf 里通过网络等方式访问外部资源来判断现在需要过滤的是哪些字段的哪些值。

yunying  于2021年9月28日周二 上午9:47写道:

>
> flink消费一个kafka主题,比如里面有一个字段分为a,b,c,d..,现在有一个需求就是要过滤出字段是a的数据,进行后续操作。写完这个任务提交了过后,过段时间又需要过滤出字段b进行后续操作,后续的操作都是一样的,现在我就要为它在开发一个任务,在提交一次,数据量都不大。但是每提交一次都会耗费资源。以后说不定还会要过滤c,d,e有什么好办法解决这个问题吗


Re: 退订

2021-09-21 文章 Caizhi Weng
Hi!

退订中文邮件列表请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org,其他邮件列表退订邮箱参见
https://flink.apache.org/community.html#mailing-lists

Elaiza <1360619...@qq.com.invalid> 于2021年9月19日周日 上午9:14写道:

>


Re: HOP窗口较短导致checkpoint失败

2021-09-21 文章 Caizhi Weng
Hi!

24 小时且步长 1 分钟的 window 会由于数据不断累积而导致 cp 越来越大,越来越慢,最终超时。当然如果运算太慢导致 cp 被 back
pressure 也有可能导致 cp 超时。开启 mini batch 可以加快 window 的运算速度,但这么长时间而且这么频繁的 window
目前确实没有什么很好的优化方法,仍然建议扩大并发以分担计算以及 cp 的压力。

xiaohui zhang  于2021年9月18日周六 上午9:54写道:

> FLink:1.12.1
>
> 源: kafka
> create table dev_log (
> devid,
> ip,
> op_ts
> ) with (
> connector = kafka
> )
>
> sink: Hbase connect 2.2
>
> 目前用flink sql的hop
> window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
> 执行SQL如下
> insert into h_table
> select
>   devid as rowkey
>   row(hop_end, ip_cnt)
> from (
>   select
>  devid,
>  hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end,
>  count(distinct(ip)) as ip_cnt
> from
>   dev_logs
> group by
>hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR),
>   devid
> )
>
> 测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。
> 实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。
> 修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。
> 请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢
>


Re: flink sql是否支持动态创建sink table?

2021-09-21 文章 Caizhi Weng
Hi!

不太明白这个需求,但如果希望发送给不同的 topic,需要给每个 topic 都定义 DDL。

如果是因为各 topic 之间的 schema 重复度比较高,只有些许字段以及 topic 名称等不同,可以看一下 DDL LIKE 语法:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like

casel.chen  于2021年9月18日周六 上午8:27写道:

> 上游kafka topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink
> sql支持动态创建sink table吗?


Re: Flink SQL是否支持Count Window函数?

2021-09-21 文章 Caizhi Weng
Hi!

据我所知目前暂时没有增加 count window 的打算,以后可能会在最新的 Window TVF 里添加 count window tvf。

不建议在 SQL 中自行实现 count window,因为 SQL 添加 window 较为复杂。但可以考虑先将 SQL 转为
datastream,用 datastream 的 count window 之后再将 datastream 转回 SQL。

EnvironmentSettings settings = EnvironmentSettings.newInstance().
inStreamingMode().build();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment(), settings);
tEnv.executeSql(
"CREATE TABLE T ( a INT, b INT, key AS abs(a) % 3, val AS abs(b) % 3 ) WITH
( 'connector' = 'datagen' )");
Table table = tEnv.sqlQuery("SELECT key, val FROM T");
DataStream dataStream = tEnv.toDataStream(table);
DataStream> summedStream =
dataStream
.keyBy(row -> (int) row.getField(0))
.countWindow(100)
.apply(
(WindowFunction<
Row,
Tuple2,
Integer,
GlobalWindow>)
(key, window, input, out) -> {
int sum = 0;
for (Row row : input) {
Integer field = (Integer) row.getField(1);
if (field != null) {
sum += field;
}
}
out.collect(Tuple2.of(key, sum));
})
.returns(
new TupleTypeInfo<>(
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
Table summedTable = tEnv.fromDataStream(summedStream);
tEnv.registerTable("S", summedTable);
tEnv.executeSql("SELECT f0, f1 FROM S").print();

casel.chen  于2021年9月17日周五 下午6:05写道:

> 今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time
> window,问一下官方是否打算sql支持count window呢?
> 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!


Re: flinksql关联维表延迟

2021-09-16 文章 Caizhi Weng
Hi!

“处理时间会领先事件时间几十分钟”是说延迟几十分钟吗?

作业延迟的原因可能有很多,需要看到比较具体的作业才能有比较好的推测。一个自检的方法是打开 Flink web UI,观察 busy
值比较高的节点,看看是否并发度不够,或者有倾斜,或者是否 checkpoint 时间太长等等。

邓 雪昭  于2021年9月16日周四 下午2:59写道:

> 各位老师好,
>
>  
> 我目前使用Flinksql构建了一个应用,数据源是kafka,关联了一张23w数据的维表(存放在Tidb),该维表和流中的数据关联会有一些发散(业务逻辑),使用了lookup.cache.maxprows=25,ttl=3600s,目前输出到kafka,延迟很严重,处理时间会领先事件时间几十分钟并且还会持续扩大,请问有什么好的解决办法吗?
>
>
>


  1   2   >