Re: Flink SQL MYSQL schema 特性问题

2021-07-07 文章 Terry Wang
Hi~
你需要的应该是flink sql里提供的catalog功能 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#postgres-database-as-a-catalog
目前PostgresCatalog实现了jdbc catalog,MysqlCatalog没有支持,有一些资料可以参考实现:  
https://vendanner.github.io/2020/11/25/Flink-SQL-%E4%B9%8B-MySQL-Catalog/



Best,
Terry Wang



> 2021年7月8日 上午10:42,Roc Marshal  写道:
> 
> Hi, 
>   请问目前的 Flink SQL 在创建source表的时候支持自动拉取所有的表列信息并解析吗?
> 
> 
>   谢谢。
> 
> 
> Best, Roc.



Re: flinksql 流维join 问题

2021-07-06 文章 Terry Wang
语句看起来是没有问题的,可以检查下数据是否能关联上 

Best,
Terry Wang



> 2021年7月6日 下午3:24,赵旭晨  写道:
> 
> 流表:
> CREATE TABLE flink_doris_source (
> reporttime STRING,
> tenantcode STRING,
> visitid STRING,
> orderid STRING,
>   orderdetailid STRING,
>   paymentdetailid STRING,
>   etltype STRING,
>   ptime as proctime()
> ) WITH (
>  'connector' = 'kafka',  
>  'topic' = 'demo', 
>  'properties.bootstrap.servers' = '10.26.255.82:9092', 
>  'properties.group.id' = 'consumer-55',  
>  'format' = 'json',  
>  'scan.startup.mode' = 'latest-offset'  
> );
> 
> 维表:
> CREATE TABLE people (
>   `Id` int,
>   `Name` String,
>   `Sex` tinyint,
>   `Birth` timestamp,
>   `Etltype` String,
>   PRIMARY KEY (Id) NOT ENFORCED
> ) WITH (
>'connector' = 'jdbc',
>'url' = 'jdbc:mysql://10.26.20.122:3306/test1',
>'table-name' = 'people',
>'username'='root',
>'password'='***',
>'lookup.cache.ttl'='10s',
>'lookup.cache.max-rows'='20'
> );
> 
> mysql ddl:
> CREATE TABLE `people` (
>   `Id` int(11) NOT NULL AUTO_INCREMENT,
>   `Name` varchar(40) NOT NULL,
>   `Sex` tinyint(3) unsigned NOT NULL,
>   `Birth` datetime DEFAULT NULL,
>   `Etltype` varchar(40) NOT NULL,
>   PRIMARY KEY (`Id`)
> ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4
> 
> 
> --流维join
> INSERT INTO flink_doris_sink select 
> a.reporttime,a.tenantcode,a.visitid,a.orderid,a.orderdetailid,a.paymentdetailid,b.Etltype
>  from flink_doris_source a left join people FOR SYSTEM_TIME AS OF a.ptime b 
> on a.tenantcode = b.Name;
> 
> 
> 
> 维表数据没有带出来,设置的维表ttl也没效果,是语句问题么?
> flink版本:flink-1.12.0-bin-scala_2.11.tgz   mysql:5.7.32   mysql驱动:8.0.22
> 
> 
> 
>  



Re: 咨询

2021-07-05 文章 Terry Wang
Hi 
   附件没有上传成功, btw 请同时提供下使用的flink版本~
Best,
Terry Wang



> 2021年7月5日 下午7:58,852701714 <852701...@qq.com.INVALID> 写道:
> 
> 你好,我在本地搭建IDEA的FLINKCEP环境。
> 用官方的一些DEMO搭建后,发现执行CEP的逻辑后编译是成功的,但是没有结果输出。
> 例如附件中的demo类,执行CEP逻辑后的loginFailDataStream没有输出。
> debug发现也没有进入pattern的方法中。
> 请问在IDEA本地运行FLINKCEP需要额外的条件吗。(本地引入了CEP的包)



Re: flink dataset api join报错

2021-07-04 文章 Terry Wang
HI Eric~
 1) 确定下A表和B表的大小关系,如果A表比B表小的话,可以尝试交换A和B的join位置,看看能不能work 
 2)更改join的策略,可以改成 
REPARTITION_SORT_MERGE,join执行过程会比较慢,但应该不会失败,可以参考文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/dataset_transformations.html#join-algorithm-hints
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/dataset_transformations.html#join-algorithm-hints>

Thx~

Best,
Terry Wang



> 2021年7月3日 下午8:32,Eric Yang  写道:
> 
> Hello Terry
>您好,很高兴您能回复我,我这边使用的是flink 1.12版本,使用的是flink 的 dataset api 
> 的三个数据集进行join,类似于 a join b join c,join的时候没有指定JoinHint策略,但是运行的时候报错了,具体报错信息是:
> Hash join exceeded maximum number of recursions, without reducing partitions 
> enough to be memory resident. Probably cause: Too many duplicate keys.
> 显示的是第二个join报错的,我感觉可能需要指定下join的JoinHint策略,让它强制进行repartition,不晓得是否可行。
>您这边有时间的话,希望您能看一看哈,不管怎样,还是非常感谢您,祝您生活愉快!
> 
> 
> | |
> Eric Yang
> |
> |
> ymj7...@163.com <mailto:ymj7...@163.com>
> |
> 签名由网易邮箱大师定制
> 
> 
> 在2021年07月3日 19:44,Terry Wangmailto:zjuwa...@gmail.com>> 
> 写道:
> Hi Eric~
> 
> 图片打不开,能否提供下具体使用的flink版本,使用方式和报错的异常栈~
> 
> 
> Best,
> Terry Wang
> 
> 
> 
> 2021年7月2日 下午4:02,Eric Yang  写道:
> 
> 
> hello,
> 您好,打扰到大家了,我使用的是flink的dataset的API,多个数据集进行join,比如,a join b join c,最终运行报错了,报错如下:
> 我是需要在join的时候指定JoinHint策略么,求助,真的很感谢!
> 
> 
> 
> 
> 
> Eric Yang
> ymj7...@163.com
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Eric+Yang&uid=ymj7110%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fwzpmmc%2F0dca575123f8f9b1e003ef5f6019afbd.jpg&items=%5B%22ymj7110%40163.com%22%5D
>  
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Eric+Yang&uid=ymj7110%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fwzpmmc%2F0dca575123f8f9b1e003ef5f6019afbd.jpg&items=%5B%22ymj7110%40163.com%22%5D>>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81 
> <https://mail.163.com/dashi/dlpro.html?from=mail81>> 定制



Re: flink dataset api join报错

2021-07-03 文章 Terry Wang
Hi Eric~

图片打不开,能否提供下具体使用的flink版本,使用方式和报错的异常栈~


Best,
Terry Wang



> 2021年7月2日 下午4:02,Eric Yang  写道:
> 
> 
> hello,
> 您好,打扰到大家了,我使用的是flink的dataset的API,多个数据集进行join,比如,a join b join 
> c,最终运行报错了,报错如下:
>  我是需要在join的时候指定JoinHint策略么,求助,真的很感谢!
> 
> 
> 
> 
>   
> Eric Yang
> ymj7...@163.com
>  
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Eric+Yang&uid=ymj7110%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fwzpmmc%2F0dca575123f8f9b1e003ef5f6019afbd.jpg&items=%5B%22ymj7110%40163.com%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
> 
> 



Re: flink 版本视图不触发水印导致流阻塞的问题

2021-07-03 文章 Terry Wang
Hi, 光跃~

 left join的语义下,case3_TOPIC_A有数据就会往外输出的,能否给一点更多的信息来帮助排查问题~

Best,
Terry Wang



> 2021年7月2日 下午3:14,杨光跃  写道:
> 
> select a.card as card,a.cust as cust, b.city as city ,cast(a.ts as TIMESTAMP) 
> ts,c.city
> from case3_TOPIC_A a
> left join cust_data FOR SYSTEM_TIME AS OF a.ts as b on a.cust = b.cust
> left join view_case3_TOPIC_B FOR SYSTEM_TIME AS OF a.ts as c on a.cust = 
> c.cust;
> 
> 
> view_case3_TOPIC_B  是一个版本视图,现在的问题是如果view_case3_TOPIC_B  的数据不更新,
> case3_TOPIC_A 即使有新数据,也不往外输出。  
> 怎么才能做到 case3_TOPIC_A  有数据就会立马触发呢
> 
> 
> | |
> 杨光跃
> |
> |
> yangguangyuem...@163.com
> |
> 签名由网易邮箱大师定制



Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 文章 Terry Wang
Congratulations! 

Best,
Terry Wang



> 2020年1月17日 14:09,Biao Liu  写道:
> 
> Congrats!
> 
> Thanks,
> Biao /'bɪ.aʊ/
> 
> 
> 
> On Fri, 17 Jan 2020 at 13:43, Rui Li  <mailto:lirui.fu...@gmail.com>> wrote:
> Congratulations Dian, well deserved!
> 
> On Thu, Jan 16, 2020 at 5:58 PM jincheng sun  <mailto:sunjincheng...@gmail.com>> wrote:
> Hi everyone,
> 
> I'm very happy to announce that Dian accepted the offer of the Flink PMC to 
> become a committer of the Flink project.
> 
> Dian Fu has been contributing to Flink for many years. Dian Fu played an 
> essential role in PyFlink/CEP/SQL/Table API modules. Dian Fu has contributed 
> several major features, reported and fixed many bugs, spent a lot of time 
> reviewing pull requests and also frequently helping out on the user mailing 
> lists and check/vote the release.
>  
> Please join in me congratulating Dian for becoming a Flink committer !
> 
> Best, 
> Jincheng(on behalf of the Flink PMC)
> 
> 
> -- 
> Best regards!
> Rui Li



Re: flink 创建hbase出错

2020-01-02 文章 Terry Wang
Hi,

flink-hbase_2.11-1.9.0.jar 只包括了flink对hbase读写的封装的类,并没有提供hbase client的类,你需要把hbaes 
client等相关的jar包提供出来放到 lib包里面。

Best,
Terry Wang



> 2020年1月2日 16:54,lucas.wu  写道:
> 
> Hi 大家好
> 有个问题要问问大家,我现在用flink1.9版本创建hbase表
> sql:
> create table hbase_dimention_table(
> id varchar,
> info ROW(xxx)
> )with(
> 'connector.type' = 'hbase',
> 'connector.version' = '1.4.3', 
> 'connector.table-name' = '', 
> 'connector.zookeeper.quorum' = ‘xxx'
> );
> 接着把flink-hbase_2.11-1.9.0.jar 放到了lib目录下,但是在执行的时候出现这种错误
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: SQL validation failed. findAndCreateTableSource failed
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.hadoop.hbase.HBaseConfiguration
> 
> 
> 
> 
> 请问我还需要在那里加上依赖?



Re: -yD Kerberos 认证问题

2019-12-30 文章 Terry Wang
Hi ~
这个问题在最新的代码上已经修复了,在flink 1.9 上应该也是不存在这个问题的,你可以用下看看~
Best,
Terry Wang



> 2019年12月31日 14:18,  
> 写道:
> 
> 大家好
> 
> 我们这里有通过-yd动态的提交Kerberos认证参数的需求,
> 
> 想问下这个jira为啥被标记为了won’t fix,谢谢
> 
> https://issues.apache.org/jira/browse/FLINK-12130
> 



Re: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema

2019-12-29 文章 Terry Wang
你这种需求的一种解决思路,可以把 
JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。

Best,
Terry Wang



> 2019年12月27日 19:56,aven.wu  写道:
> 
> StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 
> 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema.
> 
> 
> 发送自 Windows 10 版邮件应用
> 



Re: flink 集群

2019-11-10 文章 Terry Wang
Hi, 李军~

1. 自建集群是哪种方式? 可以考虑通过yarn集群的per job模式来部署作业,集群的监控可以直接通过yarn的资源管理实现
2. 任务需要更新的时候 可以kill job,保留state 文件,重新提交新的jar包,新的jar包可以设置从老作业的checkPoint文件里恢复继续执行

相关资料可以参考 官方文档/flink china 的资料,希望能有所帮助~

Best,
Terry Wang



> 2019年11月11日 13:38,李军  写道:
> 
> 
> 
> 
> 
> 想咨询下,自建flink集群,用什么方式 管理监控集群? 还要job 
> 任务需要更新的时候,jar包以什么方式更新,并且更新任务,什么方式更新任务,能保证老的任务正常关闭,新的衔接上;
> 求解释,有相关文资料吗



Re: Flink SQL :Unknown or invalid SQL statement.

2019-10-08 文章 Terry Wang
SQL client目前还不支持create table语法,你需要在yaml文件里定义使用的表。

Best,
Terry Wang



> 2019年10月9日 上午8:49,Henry  写道:
> 
> 大家好,求助一下。
> FIink SQL> create table kafka_source( 
>> messageKey varbinary, 
>> message  varbinary, 
>> topic varchar, 
> >'partition'  int, 
>> 'offset'  bigint 
>> ) with ( 
>> type='kafka011', 
>> topic='test', 
>> bootstrap. servers='thcathost:9092', 
>> 'group. id'='chf' 
>> ); 
> 
> 
> [ERROR] Unknown or invalid SQL statement.



Re: 关于1.9使用hive中的udf

2019-09-26 文章 Terry Wang


问题1:
default关键词报错是否试过   hive.`default`.xx_udf 方式, 这样转义应该能解决关键词报错的问题。

问题2:
flink 1.10 中会支持modular plugin的方式,使用起来会更方便


Best,
Terry Wang



> 在 2019年9月25日,下午7:21,like  写道:
> 
> 各位大佬好:
>目前我在使用1.9版本中hive的udf碰到如下问题:
>1、hive的udf都是注册在default库中,sql里面带有default关键词,flink程序就会报错
>我通过 tableEnv.useCatalog("hive") 
> 、tableEnv.useDatabase("default")这种方式解决了default关键词的问题
>同时发现如果不使用tableEnv.useDatabase("xx_db"),直接使用  xx_db.fun是找不到函数的
> 
> 
>2、使用上面的方式能使用hive中指定某个库的udf,但是需要使用flink中注册的表会很麻烦
>sql里需要这么写(default_catalog.default_database.xx_table)
> 
> 
> 请问大家有没有好的使用方式和建议?感谢 !  



Re: 关于窗口org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 类实例数一直增大 导致内存溢出的问题

2019-09-25 文章 Terry Wang
会不会是你数据量比较大,然后heapMemory配置的相对较小导致的,是否尝试过调大内存和并发观察是否还有OOM?

Best,
Terry Wang



> 在 2019年9月26日,上午9:25,claylin <1012539...@qq.com> 写道:
> 
> 写了个去重的任务,代码如下:
> 
> StreamQueryConfig queryConfig = tabEnv.queryConfig();
>queryConfig.withIdleStateRetentionTime(Time.seconds(20), 
> Time.minutes(6));
> 
> 
>DataStream source = env.socketTextStream("localhost", 10028)
>.map(new MapFunction() {
>@Override
>public Student map(String value) throws Exception {
>String[] vals = value.split(",");
>if (vals.length < 2) {
>return null;
>}
>Student st = new Student();
>st.stNo = vals[0];
>st.name = vals[1];
>return st;
>}
>}).returns(Student.class);
> 
> 
>Table table = tabEnv.fromDataStream(source, "stNo, name");
> 
> 
>Table distinctTab = table.groupBy("stNo, name").select("stNo, 
> name");//.select("name, name.count as cnt");
> 
> 
>DataStream> distinctStream = 
> tabEnv.toRetractStream(distinctTab, Student.class);
> 
> 
>DataStream distintOutStrem = distinctStream.map(tuple2 -> {
>if (tuple2.f0) {
>return tuple2.f1;
>}
>return null;
>}).filter(Objects::nonNull);
> 
> 
>Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, 
> proctime.proctime");
> 
> 
>Table result = 
> after.window(Tumble.over("10.seconds").on("proctime").as("w"))
>.groupBy("name, w")
>.select("name, name.count as cnt, w.start as wStart, w.end as 
> wEnd, w.proctime as wProctime");
> 
> 
>DataStream resultStream = tabEnv.toAppendStream(result, 
> Result.class);
>resultStream.print();
>env.execute(TestState.class.getSimpleName());
> 
> 
> 
> 但是不知道问题出在哪里,随着长时间运行会导致jvm内存有用光,后面dump内存发现org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
>  类实例一直在递增,按理说一个窗口时间到了对应的TimerHeapInternalTimer实例都会随着任务执行而被删掉,但是我这里一直在递增。
> num #instances #bytes  class name
> --
>   1:  5937   44249552  [B
>   2:214238   18291832  [C
>   3:1411995647960  
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry
>   4:2135215124504  java.lang.String
>   5:1187274397272  [Ljava.lang.Object;
>   6:1081383460416  java.util.HashMap$Node
>   7: 194401667688  [Ljava.util.HashMap$Node;
>   8: 942531508048  org.apache.flink.types.Row
>   9: 470661506112  
> org.apache.flink.streaming.api.operators.TimerHeapInternalTimer
>  10: 129241426104  java.lang.Class
>  11:491229592  
> [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry;
>  12: 480721153728  java.lang.Long
>  13: 346571109024  java.util.concurrent.ConcurrentHashMap$Node
>  14:  77721078360  [I
>  15: 265911063640  java.util.LinkedHashMap$Entry
>  16: 15301 856856  java.util.LinkedHashMap
>  17: 11771 847512  java.lang.reflect.Field
>  18: 13172 843008  java.nio.DirectByteBuffer
>  19:  8570 754160  java.lang.reflect.Method
>  20:20 655680  [Lscala.concurrent.forkjoin.ForkJoinTask;
>  21: 13402 643296  java.util.HashMap
>  22: 12945 621360  
> org.apache.flink.core.memory.HybridMemorySegment
>  23: 13275 531000  sun.misc.Cleaner
>  24: 15840 506880  com.esotericsoftware.kryo.Registration
>  25:   393 450928  [Ljava.nio.ByteBuffer;
>  26: 13166 421312  java.nio.DirectByteBuffer$Deallocator
>  27: 25852 413632  java.lang.Object
>  28: 14137 339288  java.util.ArrayList
>  29:  6410 307680  
> org.apache.kafka.common.metrics.stats.SampledStat$Sample
>  30:  4572 292608  
> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
>  31:   392 288576  
> [Ljava.util.concurrent.ConcurrentHashMap$Node;
>  32:  8412 269184  org.apache.kafka.common.MetricName
>  33:  8412 

Re: Flink ORC 读取问题

2019-09-23 文章 Terry Wang
能否起一个本地程序,设置断点,看看读取数据那块儿逻辑是不是有问题
Best,
Terry Wang



> 在 2019年9月23日,下午5:11,ShuQi  写道:
> 
> Flink版本为1.9.0,基于OrcTableSource进行ORC文件的读取,碰到一个问题,程序没有任何异常,顺利执行完毕,但又部分字段读出来始终为null,但实际是有值得,通过直接读取文件的方式可以读取到全部字段。
> 
> 
> 请问大家是否有什么好的建议,谢谢!
> 



Re: 请教初始化系统缓存的问题

2019-09-23 文章 Terry Wang
你好,可以考虑在open方法里启动一个定时的线程去取mysql里去数据和进行缓存更新。
当有新数据流入到你的系统中时,可以判断定时线程数据加载是否完成,当数据加载完成后再进行数据处理。
希望能有帮助~

Best,
Terry Wang



> 在 2019年9月24日,上午10:45,haoxin...@163.com 写道:
> 
> 大家好,初学flink,版本1.8.1。想请教一个思路问题:在物联网系统中会实时处理一些设备上来的状态数据,但是设备元数据信息或者基础数据会存储在类似MySQL数据库中,当实时流数据上来的时候需要这些基础数据进行计算。但是因为性能问题,不能实时的去数据库获取,所以需要在系统启动的时候缓存起来,然后再开始收数据或者开始处理数据。数据来源kafka。
> 请教2个问题:
> 
>1. 
> 有什么方式能否保证数据开始处理的时候,基础数据已经缓存好了,可以在流处理中获取到(类似valueState),算子的open函数似乎不能写到valueState里面去,只能初始化state。
>2. 当基础数据在数据库中有变化的时候,如何实时的通知到处理流中,更新缓存?(广播流?)
> 
> 谢谢。
> 
> 
> 
> haoxin...@163.com



Re: flink-kafka Trigger 无法触发问题

2019-05-15 文章 Terry Wang
有可能是并行度设置大时,source的部分并发没有数据,导致eventTime未更新。可以排查下是否是这个问题

> 在 2019年5月15日,下午2:18,13341000780 <13341000...@163.com> 写道:
> 
> hi, 各位大牛好!
>   自定义了窗口触发器trigger,在onElement函数中注册了EventTimeTimer。出现了很离奇的问题,当并行度Parallelism 
> 设置的比slots数和CPU核数小时,能成功触发onEventTime函数,当大于slots数或者大于CPU核数时,发现无法触发onEventTime,已确定元素能成功进入窗口,即onElement函数能成功触发。有人遇到过类似的问题吗,求解答。
> 
> 
> 非常感谢.
> 
> 
> 
> 
>