Re: flink sql cli 读取 hbase表报错

2020-06-28 文章 Leonard Xu
Hello, 这应该是一个已知bug[1],原因是Configuration是不可序列化的,HbaseRowInputFormat中没有正确处理,导致用户DDL中的zk配置无法传递。 在flink1.11和1.12上已经修复。如果是1.10.x版本中,可以将HBase 的配置文件(hbase-default.xml、 hbase-site.xml) 添加到 classpath下,也可以把 HBase 的配置文件添加到HADOOP_CLASSPATH(flnk启动脚本会检查HADOOP_CLASSPATH环境变量并加载),两种方式Flink集群和SQL Client都能加载到Hbase的

Re: 无法生成rowtime导致在window失败

2020-06-28 文章 Leonard Xu
Hi, > field("logictime","TIMESTAMP(3)”) 报错的原因这个字段在你原始的表中不存在的,理解你的需求是你想用 field evitime(Long型)生成一个新的 field logictime(TIMESTAMP(3)),这个可以用计算列解决,Table API上还不支持计算列,1.12 已经在开发中了。你可以用 DDL 加计算列完成满足你的需求,参考[1] create table test ( acct STRING, evitime BIGINT, logictime as TO_TIMESTAMP(FROM_UNIXTIME(evi

Re: flink sql row类型group by

2020-06-28 文章 Leonard Xu
> 在 2020年6月29日,12:05,sunfulin 写道: > > 这种情况下推导PK貌似会报错(UpsertStreamTableSink requires full primary keys)。这种应该怎么写哈? Hi, 在1.10.x 版本中,upsertSink 中推导 pk 是通过query 来推导,这个比较好的解决是等1.11发布后,通过在建表的DDL声明主键( PRIMARY KEY NOT ENFORCED), 如果要在1.10.x里解决,一般是改写下query,使得推导的pk能符合预期。这个写入es的sink要求 pk 是简单类型,而你的query又需要

Re:Re: flink sql row类型group by

2020-06-28 文章 sunfulin
hi, Leonard 这个写法应该是OK,不过我的场景下是下面这种 select a, b, row(commentId, commentContent) from T group by a, b, commentId, commentContent 这种情况下推导PK貌似会报错(UpsertStreamTableSink requires full primary keys)。这种应该怎么写哈? 在 2020-06-29 10:19:31,"Leonard Xu" 写道: >Hi, >异常信息很有用,group by ROW 在 Flink SQL 里

flink sql cli 读取 hbase表报错

2020-06-28 文章 王良
您好: 我使用的是flink 1.10 ,通过sql-client 创建了hbase 表 CREATE TABLE dim_term ( term_id string, info ROW( term_name string, term_name_combine string, term_notice string, term_remarks string, season string, term_sequence string, term_start_time string, ter

flink读取kafka超时问题

2020-06-28 文章 阿华田
Caused by: java.lang.Exception: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dercd_seeme-3 could be determined 大佬们flink读取kafka遇到过这个错误没?现在情况是 每次重启任务都会出现这个错,但是奇怪的是多试几次任务才能运行起来。这个任务的特点读取得topic较多(6个),数据量比较大。难道是读取得数据量太大给kafka集群的broker

无法生成rowtime导致在window失败

2020-06-28 文章 naturalfree
大家好 在使用窗口的过程中遇到一个问题,麻烦大家帮忙看下! 简单描述下情况:我们是从kafka获取数据,在flink做一些相关处理后sink到elasticsearch中。没有使用window的时候没有问题,可以成功完成流程。使用窗口后报错:Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.

Re: flink sql row类型group by

2020-06-28 文章 Leonard Xu
Hi, 异常信息很有用,group by ROW 在 Flink SQL 里是支持的,只是在 ElasticSearchUpsertSink 的时候不支持,原因是 ElasticSearchUpsertSink 需要将 group by 的 keys 字段转换成 String 用于构造 UpdateRequest、DeleteRequest 对象,更进一步的原因是 keys 字段转换成的 String 对应了 Es 中的 id 字段(String 类型)。So, 当前ElasticSearchUpsertSink 的实现中,只支持将简单类型作为 keys ,复杂类型不支持,复杂类型t

Re: flink sql 中值为null时结果都为 false

2020-06-28 文章 Leonard Xu
Hello 更新下,社区这个 issue(FLINK-18164 )和 Benchao 讨论后关闭了,因为当前Flink在处理 null 的行为是正确的,所以建议处理 null 时,都用 IS NULL , IS NOT NULL 先做下过滤再做逻辑判断,一般SQL里面也是这样处理的。 祝好, Leonard Xu > 在 2020年6月7日,17:22,Leonard Xu 写道: > > Hi, > Flink 用Calcite做sql解析和优化, 这是个 bool 的二

Re: 【Flink SQL对于NULL在不等时候的处理】

2020-06-28 文章 Leonard Xu
> 社区之前有个issue[1]在跟进这个问题, 在此之前建议处理 null 时,都用 IS NULL , IS NOT NULL > 先做下过滤再做逻辑判断,一般SQL里面也是这样处理的。 > > 祝好, > Leonard Xu > [1] https://issues.apache.org/jira/browse/FLINK-18164 > > 更新下,社区这个 issue(FLINK-18164

?????? ??Flink Sql ????????????????????????????

2020-06-28 文章 ????????
Hi, ??c1,c2,c3??c4 alter tablec4c1,c4,c2,c3?? ??. --  -- ??: "Jark Wu"

Re:Re: flink sql row类型group by

2020-06-28 文章 sunfulin
hi, 谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner org.apache.flink.table.api.ValidationException: Only simple types that can be safely converted into a string representation can be used as keys. But was: Row(commentId: String, commentContent: String) at org.apache.flink.streaming.connec

?????? ??????????

2020-06-28 文章 cs
??yarnyarn?? yarn??standalone??standalone??flink??, yarnspark??MR?? --  -- ??: "LakeShen"

Re: 高可用集群

2020-06-28 文章 LakeShen
Hi 李军, 目前我们在 Yarn 上面的话,用的是 Flink On Yarn Per Job 模式,在 K8s 上面的话,就是 Standalone per Job 模式。 Best, LakeShen 刘佳炜 于2020年6月28日周日 下午5:14写道: > 如果你公司用hadoop的话就是YARN  StandAlone一般都是单机测试练习的 > > > > 发自我的iPhone > > > -- 原始邮件 -- > 发件人: 李军 发送时间: 2020年6月28日 17:11 > 收件人: user-zh

关于注册定时器的一些疑问

2020-06-28 文章 Jun Zhang
大家好: 官网的解释中,注册定时器只能是keyed stream,我使用BroadcastConnectedStream 接一个KeyedBroadcastProcessFunction函数发现也能注册定时器,我测试了一下,只限于使用processtime的时候,如果使用的是eventtime就不好使了,请问这个是什么原因呢?谢谢。

回复:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

2020-06-28 文章 夏帅
你好,这个问题从异常来看是使用TupleTypeInfo导致的,可以试下使用GenericRecordAvroTypeInfo -- 发件人:yingbo yang 发送时间:2020年6月28日(星期日) 17:38 收件人:user-zh 主 题:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题 Hi: 在使用 ParquetAvroWriters.forGenericRecord(Schema schema

flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

2020-06-28 文章 yingbo yang
Hi: 在使用 ParquetAvroWriters.forGenericRecord(Schema schema) 写parquet文件的时候 出现 类转化异常: 下面是我的代码: // //transfor 2 dataStream // TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(GenericData.Record.class, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); TupleTypeInfo tupleTypeInfo = new Tup

回复:高可用集群

2020-06-28 文章 刘佳炜
如果你公司用hadoop的话就是YARN  StandAlone一般都是单机测试练习的 发自我的iPhone -- 原始邮件 -- 发件人: 李军

高可用集群

2020-06-28 文章 李军
请教下,各位大佬们生产环境使用的是哪种集群配置 1. Standalone 集群 2. Yarn 集群 理由是什么,不知道怎么选择 2020-6-28 | | 李军 | | hold_li...@163.com | 签名由网易邮箱大师定制

Re: Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-28 文章 Paul Lam
Hi, 其实 HA 元数据没有自动清理是老问题了,可能要等到 ZK HA 的逻辑重构之后才可以解决,具体可以参考以下两个 ticket [1][2]。 不过即使 Flink 实现了自动清理,也没有办法处理外部原因导致作业退出而留下的元数据,所以还是要用户自己实现检测和清理的机制。 1. https://issues.apache.org/jira/browse/FLINK-6522 2. https://issues.apache.org/jira/browse/FLINK-1033

Re: Re: 【Flink在sink端的Exactly once语义】

2020-06-28 文章 Jingsong Li
Hi, 补充Benchao的观点: - 除了kafka以外,还有StreamingFileSink也是exactly-once不多不少的。 - 对于Mysql、ES,这种支持主键更新的,在upsert语义下(比如一个count(*) from t group by),数据是最终一致的。所以我理解数据也是不多不少的exactly once语义。 Best, Jingsong Lee On Mon, Jun 22, 2020 at 11:46 AM 程龙 <13162790...@163.com> wrote: > 需要自己实现比如幂等操作 比如通过表示为操作 > > > > > > >

Re: flinksql

2020-06-28 文章 Jingsong Li
Hi, 在1.11之前,注意:flink sql-client只能创建flink的表而不是hive的表。 如果你用create table t (i int, j int);的这个一个简短的语句,是不能创建出flink表来的。完整的Flink表需要with参数。[1] 在1.11中支持的hive dialect,才支持用create table t (i int, j int);这种简单的DDL创建Hive表。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html