Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Arvid Heise
Hi Yun, Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again. A couple of comments: 1) You call the tasks that get the barriers injected leaf nodes, which wou

TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-12 Thread DONG, Weike
Hi community, Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with *JobManager *becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or ev

Re: Flink Kuberntes Libraries

2020-10-12 Thread superainbower
Hi Till, Could u tell me how to configure HDFS as statebackend when I deploy flink on k8s? I try to add the following to flink-conf.yaml state.backend: rocksdb state.checkpoints.dir: hdfs://slave2:8020/flink/checkpoints state.savepoints.dir: hdfs://slave2:8020/flink/savepoints state.backend.incr

Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Arvid Heise
Hi Rinat, Which API are you using? If you use datastream API, the common way to simulate side inputs (which is what you need) is to use a broadcast. There is an example on SO [1]. [1] https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processeleme

Re: Additional options to S3 Filesystem: Interest?

2020-10-12 Thread Arvid Heise
Hi Padarn, sounds like a good addition to me. We could wait for more feedback or you could start immedately. The next step would be to create a JIRA and get it assigned to you. Looking forward to your contribution Arvid On Sun, Oct 11, 2020 at 7:45 AM Padarn Wilson wrote: > Hi Flink Users, >

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

2020-10-12 Thread DONG, Weike
Hi community, I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts. Here is the link: JobManager: https://gist.

Re: Flink Kuberntes Libraries

2020-10-12 Thread Till Rohrmann
Hi Superainbower, could you share the complete logs with us? They contain which Flink version you are using and also the classpath you are starting the JVM with. Have you tried whether the same problem occurs with the latest Flink version? Cheers, Till On Mon, Oct 12, 2020 at 10:32 AM superainbo

Re: restoring from externalized incremental rocksdb checkpoint?

2020-10-12 Thread Congxian Qiu
Hi Jeff Sorry for the late reply. You can only restore the checkpoint in which there is a _metadata in the chk-xxx directory, if there is not _metadata in the chk-xxx directory, that means the chk-xxx is not complete, you can't restore from it. Best, Congxian Jeffrey Martin 于2020年9月15日周二 下

Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Sharipov, Rinat
Hi Arvid, thx for your reply. We are already using the approach with control streams to propagate business rules through our data-pipeline. Because all our models are powered by Python, I'm going to use Table API and register UDF functions, where each UDF is a separate model. So my question is -

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-12 Thread Timo Walther
Hi Austin, your explanation for the KeyedProcessFunction implementation sounds good to me. Using the time and state primitives for this task will make the implementation more explicit but also more readable. Let me know if you could solve your use case. Regards, Timo On 09.10.20 17:27, Aus

Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Dian Fu
Hi Rinat, Do you want to replace the UDFs with new ones on the fly or just want to update the model which could be seen as instance variables inside the UDF? For the former case, it's not supported AFAIK. For the latter case, I think you could just update the model in the UDF periodically or ac

回复: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread 大森林
Thanks for your replies. When I use no state-relevant code in my program,the checkingpoint can be saved and resumed.❶ So then why we need Keyed State/Operator State/Stateful Function?❷ "the operators are reset to the time of the respective checkpoint." We already have met the requirement:"resume

Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Sharipov, Rinat
Hi Dian, thx for your reply ! I was wondering to replace UDF on the fly from Flink, of course I'm pretty sure that it's possible to implement update logic directly in Python, thx for idea Regards, Rinat пн, 12 окт. 2020 г. в 14:20, Dian Fu : > Hi Rinat, > > Do you want to replace the UDFs with

[DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Kostas Kloudas
Hi all, As the title suggests, this thread is to discuss the removal of the flink-connector-filesystem module which contains (only) the deprecated BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in favor of the relatively recently introduced StreamingFileSink. For the sake of a

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Chesnay Schepler
Are older versions of the module compatible with 1.12+? On 10/12/2020 4:30 PM, Kostas Kloudas wrote: Hi all, As the title suggests, this thread is to discuss the removal of the flink-connector-filesystem module which contains (only) the deprecated BucketingSink. The BucketingSin is deprecated s

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Kostas Kloudas
Hi Chesnay, Unfortunately not from what I can see in the code. This is the reason why I am opening a discussion. I think that if we supported backwards compatibility, this would have been an easier process. Kostas On Mon, Oct 12, 2020 at 4:32 PM Chesnay Schepler wrote: > > Are older versions of

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Yun Gao
Hi Arvid, Very thanks for the insightful comments! I added the responses for this issue under the quota: >> 1) You call the tasks that get the barriers injected leaf nodes, which would >> make the > sinks the root nodes. That is very similar to how graphs in >> relational algebra are labeled. H

PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-12 Thread Sharipov, Rinat
Hi mates ! I'm very new at pyflink and trying to register a custom UDF function using python API. Currently I faced an issue in both server env and my local IDE environment. When I'm trying to execute the example below I got an error message: *The configured Task Off-Heap Memory 0 bytes is less t

Re: [DISCUSS] Remove flink-connector-filesystem module.

2020-10-12 Thread Chesnay Schepler
Is there a way for us to change the module (in a reasonable way) that would allow users to continue using it? Is it an API problem, or one of semantics? On 10/12/2020 4:57 PM, Kostas Kloudas wrote: Hi Chesnay, Unfortunately not from what I can see in the code. This is the reason why I am openi

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-10-12 Thread Arvid Heise
Hi Yun, 4) Yes, the interaction is not trivial and also I have not completely thought it through. But in general, I'm currently at the point where I think that we also need non-checkpoint related events in unaligned checkpoints. So just keep that in mind, that we might converge anyhow at this poin

Re: Additional options to S3 Filesystem: Interest?

2020-10-12 Thread Dan Diephouse
We use the StreamingFileSink. An option to expire files after some time period would certainly be welcome. (I could probably figure out a way to do this from the S3 admin UI too though) On Sat, Oct 10, 2020 at 10:45 PM Padarn Wilson wrote: > Hi Flink Users, > > We need to expose some additional

FW: NPE in disposing flink sql group window when running flink using ./gradlew shadowJar run

2020-10-12 Thread Dcosta, Agnelo (HBO)
Flink application using kafka topics as source and destination. Using javaVersion = '1.11' flinkVersion = '1.11.1' scalaBinaryVersion ='2.11' the application is primarily using Flink SQL apis. We have a StatementSet and add sql inserts to that set using addInsertSql. when there are more insert st

Re: Additional options to S3 Filesystem: Interest?

2020-10-12 Thread Padarn Wilson
Thanks for the feedback. I've created a JIRA here https://issues.apache.org/jira/browse/FLINK-19589. @Dan: This indeed would make it easier to set a lifetime property on objects created by Flink, but actually if you want to apply it to all your objects for a given bucket you can set bucket wide po

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-12 Thread Xingbo Huang
Hi, You can use api to set configuration: table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m') The flink-conf.yaml way will only take effect when submitted through flink run, and the minicluster way(python xxx.py) will not take effect. Best, Xingb

Missing annotation in SimpleJdbcConnectionProvider.java ?

2020-10-12 Thread Kenzyme
Hi, I would like to know if class [SimpleJdbcConnectionProvider](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProvider.java).java should be marked as @Internal or @PublicEvo

Re: Missing annotation in SimpleJdbcConnectionProvider.java ?

2020-10-12 Thread Kenzyme
After careful examination, seems like it should be marked as @Internal since this class is located in package org.apache.flink.connector.jdbc.internal.connection. Here is my PR related to this https://github.com/apache/flink/pull/13603 . Thanks a lot! Kenzyme Le ‐‐‐ Original Message ‐

Dynamic file name prefix - StreamingFileSink

2020-10-12 Thread Vijayendra Yadav
Hi Team, I have tried to assign a dynamic prefix for file name, which contains datetime components. *The Problem is Job always takes initial datetime when job first starts and never refreshes later. * *How can I get dynamic current datetime in filename at sink time ?* *.withPartPrefix (ZonedDateT

RE: state access causing segmentation fault

2020-10-12 Thread Colletta, Edward
Thanks Arvid, I added static to ExecQueue and this did fix the problem. I tested without static on RingBufferExec because it seems that if ExecQueue is static nested, there should be no reference to the MyKeyedProcessFunction object as RingBufferExec is an inner class of ExecQueue. However, I

Re: why we need keyed state and operate state when we already have checkpoint?

2020-10-12 Thread Congxian Qiu
Hi As others said, state is different as checkpoint. a checkpoint is just a **snapshot** of the state, and you can restore from the previous checkpoint if the job crashed. state is for stateful computation, and checkpoint is for fault-tolerant[1] The state keeps the information you'l

Flink Kafka offsets

2020-10-12 Thread Rex Fenley
Hello, I've been trying to configure the offset start position for a flink kafka consumer. when there is no committed offset, to always start at the beginning. It seems like the typical way to do this would be setting auto.offset.reset=earliest however, I don't see that configuration property in t

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-12 Thread Sharipov, Rinat
Hi Xingbo, thx a lot, it works ! But I'm still sure that it's not obvious from a user point of view, that *pyflink-shell.sh *doesn't use provided flink-conf.yaml, don't you think that it looks like an issue ? Thx ! вт, 13 окт. 2020 г. в 05:35, Xingbo Huang : > Hi, > > You can use api to set con