Re: Flink SQL MYSQL schema 特性问题
Hi~ 你需要的应该是flink sql里提供的catalog功能 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#postgres-database-as-a-catalog 目前PostgresCatalog实现了jdbc catalog,MysqlCatalog没有支持,有一些资料可以参考实现: https://vendanner.github.io/2020/11/25/Flink-SQL-%E4%B9%8B-MySQL-Catalog/ Best, Terry Wang > 2021年7月8日 上午10:42,Roc Marshal 写道: > > Hi, > 请问目前的 Flink SQL 在创建source表的时候支持自动拉取所有的表列信息并解析吗? > > > 谢谢。 > > > Best, Roc.
Re: flinksql 流维join 问题
语句看起来是没有问题的,可以检查下数据是否能关联上 Best, Terry Wang > 2021年7月6日 下午3:24,赵旭晨 写道: > > 流表: > CREATE TABLE flink_doris_source ( > reporttime STRING, > tenantcode STRING, > visitid STRING, > orderid STRING, > orderdetailid STRING, > paymentdetailid STRING, > etltype STRING, > ptime as proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'demo', > 'properties.bootstrap.servers' = '10.26.255.82:9092', > 'properties.group.id' = 'consumer-55', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset' > ); > > 维表: > CREATE TABLE people ( > `Id` int, > `Name` String, > `Sex` tinyint, > `Birth` timestamp, > `Etltype` String, > PRIMARY KEY (Id) NOT ENFORCED > ) WITH ( >'connector' = 'jdbc', >'url' = 'jdbc:mysql://10.26.20.122:3306/test1', >'table-name' = 'people', >'username'='root', >'password'='***', >'lookup.cache.ttl'='10s', >'lookup.cache.max-rows'='20' > ); > > mysql ddl: > CREATE TABLE `people` ( > `Id` int(11) NOT NULL AUTO_INCREMENT, > `Name` varchar(40) NOT NULL, > `Sex` tinyint(3) unsigned NOT NULL, > `Birth` datetime DEFAULT NULL, > `Etltype` varchar(40) NOT NULL, > PRIMARY KEY (`Id`) > ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 > > > --流维join > INSERT INTO flink_doris_sink select > a.reporttime,a.tenantcode,a.visitid,a.orderid,a.orderdetailid,a.paymentdetailid,b.Etltype > from flink_doris_source a left join people FOR SYSTEM_TIME AS OF a.ptime b > on a.tenantcode = b.Name; > > > > 维表数据没有带出来,设置的维表ttl也没效果,是语句问题么? > flink版本:flink-1.12.0-bin-scala_2.11.tgz mysql:5.7.32 mysql驱动:8.0.22 > > > >
Re: 咨询
Hi 附件没有上传成功, btw 请同时提供下使用的flink版本~ Best, Terry Wang > 2021年7月5日 下午7:58,852701714 <852701...@qq.com.INVALID> 写道: > > 你好,我在本地搭建IDEA的FLINKCEP环境。 > 用官方的一些DEMO搭建后,发现执行CEP的逻辑后编译是成功的,但是没有结果输出。 > 例如附件中的demo类,执行CEP逻辑后的loginFailDataStream没有输出。 > debug发现也没有进入pattern的方法中。 > 请问在IDEA本地运行FLINKCEP需要额外的条件吗。(本地引入了CEP的包)
Re: flink dataset api join报错
HI Eric~ 1) 确定下A表和B表的大小关系,如果A表比B表小的话,可以尝试交换A和B的join位置,看看能不能work 2)更改join的策略,可以改成 REPARTITION_SORT_MERGE,join执行过程会比较慢,但应该不会失败,可以参考文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/dataset_transformations.html#join-algorithm-hints <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/dataset_transformations.html#join-algorithm-hints> Thx~ Best, Terry Wang > 2021年7月3日 下午8:32,Eric Yang 写道: > > Hello Terry >您好,很高兴您能回复我,我这边使用的是flink 1.12版本,使用的是flink 的 dataset api > 的三个数据集进行join,类似于 a join b join c,join的时候没有指定JoinHint策略,但是运行的时候报错了,具体报错信息是: > Hash join exceeded maximum number of recursions, without reducing partitions > enough to be memory resident. Probably cause: Too many duplicate keys. > 显示的是第二个join报错的,我感觉可能需要指定下join的JoinHint策略,让它强制进行repartition,不晓得是否可行。 >您这边有时间的话,希望您能看一看哈,不管怎样,还是非常感谢您,祝您生活愉快! > > > | | > Eric Yang > | > | > ymj7...@163.com <mailto:ymj7...@163.com> > | > 签名由网易邮箱大师定制 > > > 在2021年07月3日 19:44,Terry Wangmailto:zjuwa...@gmail.com>> > 写道: > Hi Eric~ > > 图片打不开,能否提供下具体使用的flink版本,使用方式和报错的异常栈~ > > > Best, > Terry Wang > > > > 2021年7月2日 下午4:02,Eric Yang 写道: > > > hello, > 您好,打扰到大家了,我使用的是flink的dataset的API,多个数据集进行join,比如,a join b join c,最终运行报错了,报错如下: > 我是需要在join的时候指定JoinHint策略么,求助,真的很感谢! > > > > > > Eric Yang > ymj7...@163.com > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Eric+Yang&uid=ymj7110%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fwzpmmc%2F0dca575123f8f9b1e003ef5f6019afbd.jpg&items=%5B%22ymj7110%40163.com%22%5D > > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Eric+Yang&uid=ymj7110%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fwzpmmc%2F0dca575123f8f9b1e003ef5f6019afbd.jpg&items=%5B%22ymj7110%40163.com%22%5D>> > 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81 > <https://mail.163.com/dashi/dlpro.html?from=mail81>> 定制
Re: flink dataset api join报错
Hi Eric~ 图片打不开,能否提供下具体使用的flink版本,使用方式和报错的异常栈~ Best, Terry Wang > 2021年7月2日 下午4:02,Eric Yang 写道: > > > hello, > 您好,打扰到大家了,我使用的是flink的dataset的API,多个数据集进行join,比如,a join b join > c,最终运行报错了,报错如下: > 我是需要在join的时候指定JoinHint策略么,求助,真的很感谢! > > > > > > Eric Yang > ymj7...@163.com > > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Eric+Yang&uid=ymj7110%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fwzpmmc%2F0dca575123f8f9b1e003ef5f6019afbd.jpg&items=%5B%22ymj7110%40163.com%22%5D> > 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 > >
Re: flink 版本视图不触发水印导致流阻塞的问题
Hi, 光跃~ left join的语义下,case3_TOPIC_A有数据就会往外输出的,能否给一点更多的信息来帮助排查问题~ Best, Terry Wang > 2021年7月2日 下午3:14,杨光跃 写道: > > select a.card as card,a.cust as cust, b.city as city ,cast(a.ts as TIMESTAMP) > ts,c.city > from case3_TOPIC_A a > left join cust_data FOR SYSTEM_TIME AS OF a.ts as b on a.cust = b.cust > left join view_case3_TOPIC_B FOR SYSTEM_TIME AS OF a.ts as c on a.cust = > c.cust; > > > view_case3_TOPIC_B 是一个版本视图,现在的问题是如果view_case3_TOPIC_B 的数据不更新, > case3_TOPIC_A 即使有新数据,也不往外输出。 > 怎么才能做到 case3_TOPIC_A 有数据就会立马触发呢 > > > | | > 杨光跃 > | > | > yangguangyuem...@163.com > | > 签名由网易邮箱大师定制
Re: [ANNOUNCE] Dian Fu becomes a Flink committer
Congratulations! Best, Terry Wang > 2020年1月17日 14:09,Biao Liu 写道: > > Congrats! > > Thanks, > Biao /'bɪ.aʊ/ > > > > On Fri, 17 Jan 2020 at 13:43, Rui Li <mailto:lirui.fu...@gmail.com>> wrote: > Congratulations Dian, well deserved! > > On Thu, Jan 16, 2020 at 5:58 PM jincheng sun <mailto:sunjincheng...@gmail.com>> wrote: > Hi everyone, > > I'm very happy to announce that Dian accepted the offer of the Flink PMC to > become a committer of the Flink project. > > Dian Fu has been contributing to Flink for many years. Dian Fu played an > essential role in PyFlink/CEP/SQL/Table API modules. Dian Fu has contributed > several major features, reported and fixed many bugs, spent a lot of time > reviewing pull requests and also frequently helping out on the user mailing > lists and check/vote the release. > > Please join in me congratulating Dian for becoming a Flink committer ! > > Best, > Jincheng(on behalf of the Flink PMC) > > > -- > Best regards! > Rui Li
Re: flink 创建hbase出错
Hi, flink-hbase_2.11-1.9.0.jar 只包括了flink对hbase读写的封装的类,并没有提供hbase client的类,你需要把hbaes client等相关的jar包提供出来放到 lib包里面。 Best, Terry Wang > 2020年1月2日 16:54,lucas.wu 写道: > > Hi 大家好 > 有个问题要问问大家,我现在用flink1.9版本创建hbase表 > sql: > create table hbase_dimention_table( > id varchar, > info ROW(xxx) > )with( > 'connector.type' = 'hbase', > 'connector.version' = '1.4.3', > 'connector.table-name' = '', > 'connector.zookeeper.quorum' = ‘xxx' > ); > 接着把flink-hbase_2.11-1.9.0.jar 放到了lib目录下,但是在执行的时候出现这种错误 > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: SQL validation failed. findAndCreateTableSource failed > Caused by: java.lang.ClassNotFoundException: > org.apache.hadoop.hbase.HBaseConfiguration > > > > > 请问我还需要在那里加上依赖?
Re: -yD Kerberos 认证问题
Hi ~ 这个问题在最新的代码上已经修复了,在flink 1.9 上应该也是不存在这个问题的,你可以用下看看~ Best, Terry Wang > 2019年12月31日 14:18, > 写道: > > 大家好 > > 我们这里有通过-yd动态的提交Kerberos认证参数的需求, > > 想问下这个jira为啥被标记为了won’t fix,谢谢 > > https://issues.apache.org/jira/browse/FLINK-12130 >
Re: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema
你这种需求的一种解决思路,可以把 JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。 Best, Terry Wang > 2019年12月27日 19:56,aven.wu 写道: > > StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 > 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema. > > > 发送自 Windows 10 版邮件应用 >
Re: flink 集群
Hi, 李军~ 1. 自建集群是哪种方式? 可以考虑通过yarn集群的per job模式来部署作业,集群的监控可以直接通过yarn的资源管理实现 2. 任务需要更新的时候 可以kill job,保留state 文件,重新提交新的jar包,新的jar包可以设置从老作业的checkPoint文件里恢复继续执行 相关资料可以参考 官方文档/flink china 的资料,希望能有所帮助~ Best, Terry Wang > 2019年11月11日 13:38,李军 写道: > > > > > > 想咨询下,自建flink集群,用什么方式 管理监控集群? 还要job > 任务需要更新的时候,jar包以什么方式更新,并且更新任务,什么方式更新任务,能保证老的任务正常关闭,新的衔接上; > 求解释,有相关文资料吗
Re: Flink SQL :Unknown or invalid SQL statement.
SQL client目前还不支持create table语法,你需要在yaml文件里定义使用的表。 Best, Terry Wang > 2019年10月9日 上午8:49,Henry 写道: > > 大家好,求助一下。 > FIink SQL> create table kafka_source( >> messageKey varbinary, >> message varbinary, >> topic varchar, > >'partition' int, >> 'offset' bigint >> ) with ( >> type='kafka011', >> topic='test', >> bootstrap. servers='thcathost:9092', >> 'group. id'='chf' >> ); > > > [ERROR] Unknown or invalid SQL statement.
Re: 关于1.9使用hive中的udf
问题1: default关键词报错是否试过 hive.`default`.xx_udf 方式, 这样转义应该能解决关键词报错的问题。 问题2: flink 1.10 中会支持modular plugin的方式,使用起来会更方便 Best, Terry Wang > 在 2019年9月25日,下午7:21,like 写道: > > 各位大佬好: >目前我在使用1.9版本中hive的udf碰到如下问题: >1、hive的udf都是注册在default库中,sql里面带有default关键词,flink程序就会报错 >我通过 tableEnv.useCatalog("hive") > 、tableEnv.useDatabase("default")这种方式解决了default关键词的问题 >同时发现如果不使用tableEnv.useDatabase("xx_db"),直接使用 xx_db.fun是找不到函数的 > > >2、使用上面的方式能使用hive中指定某个库的udf,但是需要使用flink中注册的表会很麻烦 >sql里需要这么写(default_catalog.default_database.xx_table) > > > 请问大家有没有好的使用方式和建议?感谢 !
Re: 关于窗口org.apache.flink.streaming.api.operators.TimerHeapInternalTimer 类实例数一直增大 导致内存溢出的问题
会不会是你数据量比较大,然后heapMemory配置的相对较小导致的,是否尝试过调大内存和并发观察是否还有OOM? Best, Terry Wang > 在 2019年9月26日,上午9:25,claylin <1012539...@qq.com> 写道: > > 写了个去重的任务,代码如下: > > StreamQueryConfig queryConfig = tabEnv.queryConfig(); >queryConfig.withIdleStateRetentionTime(Time.seconds(20), > Time.minutes(6)); > > >DataStream source = env.socketTextStream("localhost", 10028) >.map(new MapFunction() { >@Override >public Student map(String value) throws Exception { >String[] vals = value.split(","); >if (vals.length < 2) { >return null; >} >Student st = new Student(); >st.stNo = vals[0]; >st.name = vals[1]; >return st; >} >}).returns(Student.class); > > >Table table = tabEnv.fromDataStream(source, "stNo, name"); > > >Table distinctTab = table.groupBy("stNo, name").select("stNo, > name");//.select("name, name.count as cnt"); > > >DataStream> distinctStream = > tabEnv.toRetractStream(distinctTab, Student.class); > > >DataStream distintOutStrem = distinctStream.map(tuple2 -> { >if (tuple2.f0) { >return tuple2.f1; >} >return null; >}).filter(Objects::nonNull); > > >Table after = tabEnv.fromDataStream(distintOutStrem, "stNo, name, > proctime.proctime"); > > >Table result = > after.window(Tumble.over("10.seconds").on("proctime").as("w")) >.groupBy("name, w") >.select("name, name.count as cnt, w.start as wStart, w.end as > wEnd, w.proctime as wProctime"); > > >DataStream resultStream = tabEnv.toAppendStream(result, > Result.class); >resultStream.print(); >env.execute(TestState.class.getSimpleName()); > > > > 但是不知道问题出在哪里,随着长时间运行会导致jvm内存有用光,后面dump内存发现org.apache.flink.streaming.api.operators.TimerHeapInternalTimer > 类实例一直在递增,按理说一个窗口时间到了对应的TimerHeapInternalTimer实例都会随着任务执行而被删掉,但是我这里一直在递增。 > num #instances #bytes class name > -- > 1: 5937 44249552 [B > 2:214238 18291832 [C > 3:1411995647960 > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry > 4:2135215124504 java.lang.String > 5:1187274397272 [Ljava.lang.Object; > 6:1081383460416 java.util.HashMap$Node > 7: 194401667688 [Ljava.util.HashMap$Node; > 8: 942531508048 org.apache.flink.types.Row > 9: 470661506112 > org.apache.flink.streaming.api.operators.TimerHeapInternalTimer > 10: 129241426104 java.lang.Class > 11:491229592 > [Lorg.apache.flink.runtime.state.heap.CopyOnWriteStateTable$StateTableEntry; > 12: 480721153728 java.lang.Long > 13: 346571109024 java.util.concurrent.ConcurrentHashMap$Node > 14: 77721078360 [I > 15: 265911063640 java.util.LinkedHashMap$Entry > 16: 15301 856856 java.util.LinkedHashMap > 17: 11771 847512 java.lang.reflect.Field > 18: 13172 843008 java.nio.DirectByteBuffer > 19: 8570 754160 java.lang.reflect.Method > 20:20 655680 [Lscala.concurrent.forkjoin.ForkJoinTask; > 21: 13402 643296 java.util.HashMap > 22: 12945 621360 > org.apache.flink.core.memory.HybridMemorySegment > 23: 13275 531000 sun.misc.Cleaner > 24: 15840 506880 com.esotericsoftware.kryo.Registration > 25: 393 450928 [Ljava.nio.ByteBuffer; > 26: 13166 421312 java.nio.DirectByteBuffer$Deallocator > 27: 25852 413632 java.lang.Object > 28: 14137 339288 java.util.ArrayList > 29: 6410 307680 > org.apache.kafka.common.metrics.stats.SampledStat$Sample > 30: 4572 292608 > com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField > 31: 392 288576 > [Ljava.util.concurrent.ConcurrentHashMap$Node; > 32: 8412 269184 org.apache.kafka.common.MetricName > 33: 8412
Re: Flink ORC 读取问题
能否起一个本地程序,设置断点,看看读取数据那块儿逻辑是不是有问题 Best, Terry Wang > 在 2019年9月23日,下午5:11,ShuQi 写道: > > Flink版本为1.9.0,基于OrcTableSource进行ORC文件的读取,碰到一个问题,程序没有任何异常,顺利执行完毕,但又部分字段读出来始终为null,但实际是有值得,通过直接读取文件的方式可以读取到全部字段。 > > > 请问大家是否有什么好的建议,谢谢! >
Re: 请教初始化系统缓存的问题
你好,可以考虑在open方法里启动一个定时的线程去取mysql里去数据和进行缓存更新。 当有新数据流入到你的系统中时,可以判断定时线程数据加载是否完成,当数据加载完成后再进行数据处理。 希望能有帮助~ Best, Terry Wang > 在 2019年9月24日,上午10:45,haoxin...@163.com 写道: > > 大家好,初学flink,版本1.8.1。想请教一个思路问题:在物联网系统中会实时处理一些设备上来的状态数据,但是设备元数据信息或者基础数据会存储在类似MySQL数据库中,当实时流数据上来的时候需要这些基础数据进行计算。但是因为性能问题,不能实时的去数据库获取,所以需要在系统启动的时候缓存起来,然后再开始收数据或者开始处理数据。数据来源kafka。 > 请教2个问题: > >1. > 有什么方式能否保证数据开始处理的时候,基础数据已经缓存好了,可以在流处理中获取到(类似valueState),算子的open函数似乎不能写到valueState里面去,只能初始化state。 >2. 当基础数据在数据库中有变化的时候,如何实时的通知到处理流中,更新缓存?(广播流?) > > 谢谢。 > > > > haoxin...@163.com
Re: flink-kafka Trigger 无法触发问题
有可能是并行度设置大时,source的部分并发没有数据,导致eventTime未更新。可以排查下是否是这个问题 > 在 2019年5月15日,下午2:18,13341000780 <13341000...@163.com> 写道: > > hi, 各位大牛好! > 自定义了窗口触发器trigger,在onElement函数中注册了EventTimeTimer。出现了很离奇的问题,当并行度Parallelism > 设置的比slots数和CPU核数小时,能成功触发onEventTime函数,当大于slots数或者大于CPU核数时,发现无法触发onEventTime,已确定元素能成功进入窗口,即onElement函数能成功触发。有人遇到过类似的问题吗,求解答。 > > > 非常感谢. > > > > >