flink sql ddl连接kafka,flink sql使用事件时间无法正常触发时间窗口

2020-11-13 Thread 李世钰
flink版本 flink1.11 flink sql连接kafka create table kafka_table ( log_id string, event_time bigint, process_time as PROCTIME(), ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)), watermark for ts as ts - interval '1' second ) with ( 'connector' = 'kafka', 'topic' = 'kafka_table',

Re: 邮件退订

2020-11-13 Thread Shawn Huang
Hi, 退订需要发邮件到 user-zh-unsubscr...@flink.apache.org 可以参考 https://flink.apache.org/zh/community.html#section-1 Best, Shawn Huang wangleigis 于2020年11月14日周六 上午11:55写道: > > > > > > > 退订 > > > > > > > > > > > -- > > 祝:工作顺利,完事如意!

邮件退订

2020-11-13 Thread wangleigis
退订 -- 祝:工作顺利,完事如意!

回复: Re:回复: flink-1.11.2 执行checkpoint失败

2020-11-13 Thread 史 正超
谢谢回复,我看了下我的任务,是背压导致的checkpoint超时,超时是没有异常日志打印的,每超时一次 就打印 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. 这里的failure counter是 checkpoint的count。 任务是没有挂的,只是一直背压了。 发件人: hailongwang <18868816...@163.com> 发送时间: 2020年11月13日

Hi all I'm having trouble with spinning up native Kubernetes cluster

2020-11-13 Thread Kevin Kwon
Hi guys, I'm trying out the native K8s cluster and having trouble with SSL I think. I use *k3d* as my local cluster for experiment here's how I launch my cluster k3d cluster create docker run \ -u flink:flink \ -v /Users/user/.kube:/opt/flink/.kube \ --network host \ --entry-point /bin/bash \

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Fuyao Li
Hi Matthias, Just to provide more context on this problem. I only have 1 partition per each Kafka Topic at the beginning before the join operation. After reading the doc:

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Fuyao Li
Hi Matthias, One more question regarding Flink table parallelism, is it possible to configure the parallelism for Table operation at operator level, it seems we don't have such API available, right? Thanks! Best, Fuyao On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li wrote: > Hi Matthias, > > Thanks

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Fuyao Li
Hi Matthias, Thanks for your information. I have managed to figure out the first issue you mentioned. Regarding the second issue. I have got some progress on it. I have sent another email with the title 'BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance' using

Re: how to enable metrics in Flink 1.11

2020-11-13 Thread Diwakar Jha
HI Robert, I'm able to see taskmanage and jobmanager logs after I changed the log4j.properties file (/usr/lib/flink/conf). It seems to be a problem with EMR 6.1 distribution. the log4j.properties files is different in the Flink package that I downloaded and the one that comes with EMR 6.1. I

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Matthias Pohl
Hi Fuyao, for your first question about the different behavior depending on whether you chain the methods or not: Keep in mind that you have to save the return value of the assignTimestampsAndWatermarks method call if you don't chain the methods together as it is also shown in [1]. At least the

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-13 Thread Simone Cavallarin
+user@ From: Simone Cavallarin Sent: 13 November 2020 16:46 To: Aljoscha Krettek Subject: Re: How to use EventTimeSessionWindows.withDynamicGap() Hi Aljoscha, When you said: You could use a stateful operation (like a ProcessFunction) to put a dynamic "gap"

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-13 Thread Matthias Pohl
Hi Si-li, trying to answer your initial question: Theoretically, you could try using the co-location constraints to achieve this. But keep in mind that this might lead to multiple Join operators running in the same JVM reducing the amount of memory each operator can utilize. Best, Matthias On

What happens when a job is rescaled

2020-11-13 Thread Richard Deurwaarder
Hello, I have a question about what actually happens when a job is started from an existing checkpoint, in particular when the parallelism has changed. *Context:* We have a flink 1.11.2 (DataStream API) job running on Kubernetes (GCP) writing its state to GCS. Normally we run with 12 TMs each 3

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

2020-11-13 Thread Till Rohrmann
I think in the JavaDocs of the WatermarkStrategy we give an incorrect example. I have created an issue [1] to fix the problem. [1] https://issues.apache.org/jira/browse/FLINK-20156 Cheers, Till On Mon, Nov 9, 2020 at 12:06 PM Aljoscha Krettek wrote: > @Till For instances where we use

Re: CLI help, documentation is confusing...

2020-11-13 Thread Till Rohrmann
Great to hear that you solved the problem! Cheers, Till On Fri, Nov 13, 2020 at 4:56 PM Marco Villalobos wrote: > Hi Till, > > Thank you for following up. > > We were trying to set up s3 file sinks, and rocksdb with s3 checkpointing. > We upgraded to Flink 1.11 and attempt to run the job in

Re: CLI help, documentation is confusing...

2020-11-13 Thread Marco Villalobos
Hi Till, Thank you for following up. We were trying to set up s3 file sinks, and rocksdb with s3 checkpointing. We upgraded to Flink 1.11 and attempt to run the job in EMR. On startup, the logs showed an error that the flink-conf.yaml could not be found. I tried to trouble shoot the command

Re: Assistance configuring access to GoogleCloudStorage for row format streaming file sink

2020-11-13 Thread Matthias Pohl
Hi, thanks for reaching out to the Flink community. The tricky thing here is that the Google Cloud Storage connector is not supported by Flink's plugin system as stated in [1]. There is a blog post on how to get started with Flink on Google's Cloud Platform [2]. In case you haven't seen that one,

Re: CLI help, documentation is confusing...

2020-11-13 Thread Till Rohrmann
Hi Marco, as Klou said, -m yarn-cluster should try to deploy a Yarn per job cluster on your Yarn cluster. Could you maybe share a bit more details about what is going wrong? E.g. the cli logs could be helpful to pinpoint the problem. I've tested that both `bin/flink run -m yarn-cluster

Re: Logs of JobExecutionListener

2020-11-13 Thread Flavio Pompermaier
see inline Il ven 13 nov 2020, 14:31 Matthias Pohl ha scritto: > Hi Flavio, > thanks for sharing this with the Flink community. Could you answer the > following questions, please: > - What's the code of your Job's main method? > it's actually very simple...the main class creates a batch

Re: PyFlink Table API and UDF Limitations

2020-11-13 Thread Dian Fu
Hi Niklas, Good to know that this solution may work for you. Regarding to the questions you raised, please find my reply inline. Regards, Dian > 在 2020年11月13日,下午8:48,Niklas Wilcke 写道: > > Hi Dian, > > thanks again for your response. In the meantime I tried out your proposal > using the

Re: Flink Job Manager Memory Usage Keeps on growing when enabled checkpoint

2020-11-13 Thread Till Rohrmann
Hi Eleanore, sorry for my late reply. The heap dump you have sent does not look problematic. How do you measure the pod memory usage exactly? If you start the Flink process with -Xms5120m -Xmx5120m then Flink should allocate 5120 MB of heap memory. Hence, this should be exactly what you are

Re: Filter By Value in List

2020-11-13 Thread Matthias Pohl
Hi Rex, after verifying with Timo I created a new issue to address your proposal of introducing a new operator [1]. Feel free to work on that one if you like. Best, Matthias [1] https://issues.apache.org/jira/browse/FLINK-20148 On Thu, Nov 5, 2020 at 6:35 PM Rex Fenley wrote: > Thanks Timo, >

Assistance configuring access to GoogleCloudStorage for row format streaming file sink

2020-11-13 Thread orionemail
Hi, I am running flink 1.10.1 initially on my local development machine - Macbook Pro. I'm struggling to understand how to write to Google Cloud storage using the StreamingfileSink (S3 works fine). There error I am seeing: "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could

Re: Logs of JobExecutionListener

2020-11-13 Thread Matthias Pohl
Hi Flavio, thanks for sharing this with the Flink community. Could you answer the following questions, please: - What's the code of your Job's main method? - What cluster backend and application do you use to execute the job? - Is there anything suspicious you can find in the logs that might be

Re: PyFlink Table API and UDF Limitations

2020-11-13 Thread Niklas Wilcke
Hi Dian, thanks again for your response. In the meantime I tried out your proposal using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am facing some issues, which I would like to address. If this goes too far, please let me know and I will open a new thread for each

Re: Job is still in ACTIVE state after /jobs/:jobid/stop

2020-11-13 Thread Matthias Pohl
Hi Averell, thanks for sharing this with the Flink community. Is there anything suspicious in the logs which you could share? Best, Matthias On Fri, Nov 13, 2020 at 2:27 AM Averell wrote: > I have some updates. Some weird behaviours were found. Please refer to the > attached photo. > > All

Flink cdc 多表关联处理延迟很大

2020-11-13 Thread 丁浩浩
我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 有没有比较好的优化方案能缓解这样的问题?

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-13 Thread Pierre Oberholzer
Thanks Dian, but same error when using explicit returned type: class dummyMap() extends ScalarFunction { def eval() : util.Map[java.lang.String,java.lang.String] = { val states = Map("key1" -> "val1", "key2" -> "val2") states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]

Re:确认订阅user-zh@flink.apache.org

2020-11-13 Thread hailongwang
Hi, 订阅中文用户邮件需发送至 user-zh-subscr...@flink.apache.org , 更多详细情况可以参考[1] [1] https://flink.apache.org/community.html#mailing-lists Best, hailong At 2020-11-13 17:12:40, "黑色" wrote:

????????user-zh@flink.apache.org

2020-11-13 Thread ????

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-13 Thread Aljoscha Krettek
Yes, you're right that Flink can do this with session windows but the assignment will be static. In general, the smaller the session gap (or session timeout) the fewer windows there will be. You're also right that you would have to somehow maintain information about how dense you records are

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-13 Thread Dian Fu
You need to explicitly defined the result type the UDF. You could refer to [1] for more details if you are using Flink 1.11. If you are using other versions of Flink, you need to refer to the corresponding documentation. [1]

Re: pyflink 1.11 运行pyflink作业时报错

2020-11-13 Thread Dian Fu
看起来是的,找不到JAVA_HOME,显式export一下JAVA_HOME试试? > 在 2020年11月13日,下午5:06,whh_960101 写道: > > Hi,各位大佬,pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 > lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2] > No such file or directory: 'java' :

pyflink 1.11 运行pyflink作业时报错

2020-11-13 Thread whh_960101
Hi,各位大佬,pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2] No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 ,该怎么解决

PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-13 Thread Pierre Oberholzer
Hi, I'm trying to use a Map[String,String] object output of a Scala UDF ( scala.collection.immutable.map) as a valid data type in the Table API, namely via Java type (java.util.Map) as recommended here ,

Re: Table SQL Filesystem CSV recursive directory traversal

2020-11-13 Thread Danny Chan
In the current master code base, all the FileInputFormat default add the files recursively with the given paths. (e.g. the #addFilesInDir method). So it should be supported as default for SQL. Timo Walther 于2020年11月9日周一 下午11:25写道: > Hi Ruben, > > by looking at the code, it seems you should be

Re: flink-1.11 使用 application 模式时 jobid 问题

2020-11-13 Thread zhisheng
看完还是没有解决方案啊 JasonLee <17610775...@163.com> 于2020年11月13日周五 下午4:10写道: > hi > 可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg > > > > - > Best Wishes > JasonLee > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

Re: flink本地编译源码失败

2020-11-13 Thread leiyanrui
修改为这个源之后可以编译下来吗 有人测过吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-13 Thread Danny Chan
Hi Dongwon ~ Table from different catalog/db is supported, you need to specify the full path of the source table: CREATE TABLE Orders_with_watermark ( *...*) WITH ( *...*)LIKE my_catalog.my_db.Orders; Dongwon Kim 于2020年11月11日周三 下午2:53写道: > Hi, > > Is it disallowed to refer to a table

Re: Data loss exception using hash join in batch mode

2020-11-13 Thread Matthias Pohl
Hi 键, we would need more context on your case (e.g. logs and more details on what you're doing exactly or any other useful information) to help. Best, Matthias On Thu, Nov 12, 2020 at 3:25 PM 键 <1941890...@qq.com> wrote: > Data loss exception using hash join in batch mode >

Re: flink-1.11 使用 application 模式时 jobid 问题

2020-11-13 Thread JasonLee
hi 可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

?????? flink 1.11.2 ????????blink????????BatchTableEnvironment????

2020-11-13 Thread Asahi Lee
BatchTableEnvironmenttable to dataset; dataset to table ---- ??: "user-zh"