@vinay - Window operators store everything in the state backend. On Mon, Sep 26, 2016 at 7:34 PM, vinay patil <[email protected]> wrote:
> I am not sure about that, I will run the pipeline on cluster and share the > details > Since window is a stateful operator , it will store only the key part in > the state backend and not the value , right ? > > Regards, > Vinay Patil > > On Mon, Sep 26, 2016 at 12:13 PM, Stephan Ewen [via Apache Flink User > Mailing List archive.] <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=9182&i=0>> wrote: > >> @vinay - Is it in your case large state that causes slower checkpoints? >> >> On Mon, Sep 26, 2016 at 6:17 PM, vinay patil <[hidden email] >> <http:///user/SendEmail.jtp?type=node&node=9181&i=0>> wrote: >> >>> Hi, >>> >>> I am also facing this issue, in my case the data is flowing continuously >>> from the Kafka source, when I increase the checkpoint interval to 60000, >>> the data gets written to S3 sink. >>> >>> Is it because some operator is taking more time for processing, like in >>> my case I am using a time window of 1sec. >>> >>> Regards, >>> Vinay Patil >>> >>> On Mon, Sep 26, 2016 at 10:08 AM, Chakravarthy varaga [via Apache Flink >>> User Mailing List archive.] <[hidden email] >>> <http:///user/SendEmail.jtp?type=node&node=9179&i=0>> wrote: >>> >>>> Hi Stefan, >>>> >>>> Please find my responses below. >>>> >>>> - What source are you using for the slow input? >>>> * [CVP] - Both stream as pointed out in my first mail, are Kafka >>>> Streams* >>>> - How large is the state that you are checkpointing? >>>> >>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as >>>> below.* >>>> >>>> >>>> >>>> * final StreamExecutionEnvironment streamEnv = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> streamEnv.setStateBackend(new >>>> FsStateBackend("file:///tmp/flink/checkpoints")); >>>> streamEnv.enableCheckpointing(10000);* >>>> >>>> >>>> * In terms of the state stored, the KS1 stream has payload of 100K >>>> events/second, while KS2 have about 1 event / 10 minutes... basically the >>>> operators perform flatmaps on 8 fields of tuple (all fields are >>>> primitives). If you look at the states' sizes in dashboard they are in >>>> Kb...* >>>> - Can you try to see in the log if actually the state snapshot takes >>>> that long, or if it simply takes long for the checkpoint barriers to >>>> travel through the stream due to a lot of backpressure? >>>> [CVP] -There are no back pressure atleast from the sample >>>> computation in the flink dashboard. 100K/second is low load for flink's >>>> benchmarks. I could not quite get the barriers vs snapshot state. I have >>>> attached the Task Manager log (DEBUG) info if that will interest you. >>>> >>>> I have attached the checkpoints times' as .png from the dashboard. >>>> Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that >>>> the checkpoints take more than a minute in each case. Before these >>>> checkpoints, the KS2 stream did not have any events. As soon as an >>>> event(should be in bytes) was generated, the checkpoints went slow and >>>> subsequently a minute more for every checkpoint thereafter. >>>> >>>> This log was collected from the standalone flink cluster with 1 job >>>> manager & 2 TMs. 1 TM was running this application with checkpointing >>>> (parallelism=1) >>>> >>>> Please let me know if you need further info., >>>> >>>> >>>> >>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <[hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=0>> wrote: >>>> >>>>> Hi! >>>>> >>>>> Let's try to figure that one out. Can you give us a bit more >>>>> information? >>>>> >>>>> - What source are you using for the slow input? >>>>> - How large is the state that you are checkpointing? >>>>> - Can you try to see in the log if actually the state snapshot takes >>>>> that long, or if it simply takes long for the checkpoint barriers to >>>>> travel >>>>> through the stream due to a lot of backpressure? >>>>> >>>>> Greetings, >>>>> Stephan >>>>> >>>>> >>>>> >>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <[hidden email] >>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=1>> wrote: >>>>> >>>>>> Hi CVP, >>>>>> >>>>>> I'm not so much familiar with the internals of the checkpointing >>>>>> system, but maybe Stephan (in CC) has an idea what's going on here. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <[hidden email] >>>>>> <http:///user/SendEmail.jtp?type=node&node=9176&i=2>>: >>>>>> >>>>>>> Hi Aljoscha & Fabian, >>>>>>> >>>>>>> I have a stream application that has 2 stream source as below. >>>>>>> >>>>>>> KeyedStream<String, String> *ks1* = ds1.keyBy("*") ; >>>>>>> KeyedStream<Tuple2<String, V>, String> *ks2* = >>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0); >>>>>>> >>>>>>> ks1.connect(ks2).flatMap(X); >>>>>>> //X is a CoFlatMapFunction that inserts and removes elements >>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched >>>>>>> against that state. the CoFlatMapFunction operator maintains >>>>>>> ValueState<Tuple2<Long, Long>>; >>>>>>> >>>>>>> //ks1 is streaming about 100K events/sec from kafka topic >>>>>>> //ks2 is streaming about 1 event every 10 minutes... Precisely >>>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 >>>>>>> minutes >>>>>>> straight away. >>>>>>> >>>>>>> The version of flink is 1.1.2. >>>>>>> >>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend... >>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for >>>>>>> many >>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes >>>>>>> frequently. I'm attaching the snapshot of the dashboard for your >>>>>>> reference. >>>>>>> >>>>>>> Is this an issue with flink checkpointing? >>>>>>> >>>>>>> Best Regards >>>>>>> CVP >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> *flink_job_Plan.png* (42K) Download Attachment >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/0/flink_job_Plan.png> >>>> *Flink-Checkpoint-Times.png* (65K) Download Attachment >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/1/Flink-Checkpoint-Times.png> >>>> *flink-qchavar-taskmanager-1-elxa1h67k32.log* (442K) Download >>>> Attachment >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/attachment/9176/2/flink-qchavar-taskmanager-1-elxa1h67k32.log> >>>> >>>> >>>> ------------------------------ >>>> If you reply to this email, your message will be added to the >>>> discussion below: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9176.html >>>> To start a new topic under Apache Flink User Mailing List archive., >>>> email [hidden email] >>>> <http:///user/SendEmail.jtp?type=node&node=9179&i=1> >>>> To unsubscribe from Apache Flink User Mailing List archive., click here >>>> . >>>> NAML >>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>> >>> >>> >>> ------------------------------ >>> View this message in context: Re: Flink Checkpoint runs slow for low >>> load stream >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9179.html> >>> Sent from the Apache Flink User Mailing List archive. mailing list >>> archive >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>> at Nabble.com. >>> >> >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream- >> tp9147p9181.html >> To start a new topic under Apache Flink User Mailing List archive., email >> [hidden >> email] <http:///user/SendEmail.jtp?type=node&node=9182&i=1> >> To unsubscribe from Apache Flink User Mailing List archive., click here. >> NAML >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > ------------------------------ > View this message in context: Re: Flink Checkpoint runs slow for low load > stream > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Checkpoint-runs-slow-for-low-load-stream-tp9147p9182.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. >
