Job graph
Hello folks, I am trying to get the job graph of a running flink job. I want to use flink libraries. For now, I have the RestClusterClient and the job IDs. Tell me please how to get the job graph. Thank you.
Re: Job graph
Hi, Nikolaos As far as I know, JobGraph is a relatively low-level concept, and currently we don't expose it directly to users, and don't provide a direct Restful API to get it from JobManager. Why do you need to get JobGraph and what is your real need? Best, Ron Nikolaos Paraskakis 于2023年8月31日周四 01:13写道: > Hello folks, > > I am trying to get the job graph of a running flink job. I want to use > flink libraries. For now, I have the RestClusterClient and the job IDs. > Tell me please how to get the job graph. > > Thank you.
Re: Job graph
This may or may not help, but you can get the execution plan from inside the client, by doing something like this (I printed the plan to stderr): ... System.err.println(env.getExecutionPlan()); env.execute("my job"); The result is a JSON-encoded representation of the job graph, which for the simple example I just tried it with, produced this output: { "nodes" : [ { "id" : 1, "type" : "Source: Custom Source", "pact" : "Data Source", "contents" : "Source: Custom Source", "parallelism" : 10 }, { "id" : 3, "type" : "Sink: Writer", "pact" : "Operator", "contents" : "Sink: Writer", "parallelism" : 10, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 5, "type" : "Sink: Committer", "pact" : "Operator", "contents" : "Sink: Committer", "parallelism" : 10, "predecessors" : [ { "id" : 3, "ship_strategy" : "FORWARD", "side" : "second" } ] } ] } On Wed, Aug 30, 2023 at 7:01 AM Nikolaos Paraskakis wrote: > > Hello folks, > > I am trying to get the job graph of a running flink job. I want to use flink > libraries. For now, I have the RestClusterClient and the job IDs. Tell me > please how to get the job graph. > > Thank you.
Re: Job graph
Hi Nikolaos, As Ron said, the jobgraph is a low level structure in flink and it is not exposed to users now. Currently you can get job details from `RestClusterClient` in method `getJobDetails(JobID jobId)`, the result `JobDetailsInfo` contains all vertices in the job and the json format job plan. Best, Shammon FY On Sat, Sep 2, 2023 at 6:26 AM David Anderson wrote: > This may or may not help, but you can get the execution plan from > inside the client, by doing something like this (I printed the plan to > stderr): > > ... > System.err.println(env.getExecutionPlan()); > env.execute("my job"); > > The result is a JSON-encoded representation of the job graph, which > for the simple example I just tried it with, produced this output: > > { > "nodes" : [ { > "id" : 1, > "type" : "Source: Custom Source", > "pact" : "Data Source", > "contents" : "Source: Custom Source", > "parallelism" : 10 > }, { > "id" : 3, > "type" : "Sink: Writer", > "pact" : "Operator", > "contents" : "Sink: Writer", > "parallelism" : 10, > "predecessors" : [ { > "id" : 1, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 5, > "type" : "Sink: Committer", > "pact" : "Operator", > "contents" : "Sink: Committer", > "parallelism" : 10, > "predecessors" : [ { > "id" : 3, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > } ] > } > > On Wed, Aug 30, 2023 at 7:01 AM Nikolaos Paraskakis > wrote: > > > > Hello folks, > > > > I am trying to get the job graph of a running flink job. I want to use > flink libraries. For now, I have the RestClusterClient and the job IDs. > Tell me please how to get the job graph. > > > > Thank you. >
Editing job graph at runtime
Hi Team, Is it possible to edit the job graph at runtime ? . Suppose, I want to add a new sink to the flink application at runtime that depends upon the specific parameters in the incoming events.Can i edit the jobgraph of a running flink application ? Thanks Jessy
failed when job graph change
Hi when i add or remove an operator in the job graph , using savepoint i must cancel the job to be able run the new graph e.g. by adding or removing operator (like new sink target) it was working in the past i using flink 1.17.1 1. is it a known bug? if so when planned to be fix 2. do i need to do something to make it work? nick
Re:failed when job graph change
Hi, nick. > using savepoint i must cancel the job to be able run the new graph Do you mean that you need cancel and start the job using the new flink job graph in 1.17.1, and in the past, it was able to make the changes to the new operator effective without restarting the job? I think in order for the new job graph to take effect, it is necessary to restart the job. -- Best! Xuyang At 2023-12-03 21:49:23, "nick toker" wrote: Hi when i add or remove an operator in the job graph , using savepoint i must cancel the job to be able run the new graph e.g. by adding or removing operator (like new sink target) it was working in the past i using flink 1.17.1 1. is it a known bug? if so when planned to be fix 2. do i need to do something to make it work? nick
Re: Editing job graph at runtime
Hi Jessy, to be precise, the JobGraph is not used at runtime. It is translated into an ExecutionGraph. But nevertheless such patterns are possible but require a bit of manual implementation. Option 1) You stop the job with a savepoint and restart the application with slightly different parameters. If the pipeline has not changed much, the old state can be remapped to the slightly modified job graph. This is the easiest solution but with the downside of maybe a couple of seconds downtime. Option 2) You introduce a dedicated control stream (i.e. by using the connect() DataStream API [1]). Either you implement a custom sink in the main stream of the CoProcessFunction. Or you enrich every record in the main stream with sink parameters that are read by you custom sink implementation. I hope this helps. Regards, Timo [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect On 16.03.21 12:37, Jessy Ping wrote: Hi Team, Is it possible to edit the job graph at runtime ? . Suppose, I want to add a new sink to the flink application at runtime that depends upon the specific parameters in the incoming events.Can i edit the jobgraph of a running flink application ? Thanks Jessy
Re: Editing job graph at runtime
Hi Timo/Team, Thanks for the reply. Just take the example from the following pseduo code, Suppose , this is the current application logic. firstInputStream = addSource(...)* //Kafka consumer C1* secondInputStream = addSource(...) *//Kafka consumer C2* outputStream = firstInputStream,keyBy(a -> a.key) .connect(secondInputStream.keyBy(b->b.key)) .coProcessFunction() * // logic determines : whether a new sink should be added to the application or not ?. If not: then the event will be produced to the existing sink(s). If a new sink is required: produce the events to the existing sinks + the new one* sink1 = addSink(outPutStream). //Kafka producer P1 . . . sinkN = addSink(outPutStream). //Kafka producer PN *Questions* --> Can I add a new sink into the execution graph at runtime, for example : a new Kafka producer , without restarting the current application or using option1 ? --> (Option 2 )What do you mean by adding a custom sink at coProcessFunction , how will it change the execution graph ? Thanks Jessy On Tue, 16 Mar 2021 at 17:45, Timo Walther wrote: > Hi Jessy, > > to be precise, the JobGraph is not used at runtime. It is translated > into an ExecutionGraph. > > But nevertheless such patterns are possible but require a bit of manual > implementation. > > Option 1) You stop the job with a savepoint and restart the application > with slightly different parameters. If the pipeline has not changed > much, the old state can be remapped to the slightly modified job graph. > This is the easiest solution but with the downside of maybe a couple of > seconds downtime. > > Option 2) You introduce a dedicated control stream (i.e. by using the > connect() DataStream API [1]). Either you implement a custom sink in the > main stream of the CoProcessFunction. Or you enrich every record in the > main stream with sink parameters that are read by you custom sink > implementation. > > I hope this helps. > > Regards, > Timo > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect > > On 16.03.21 12:37, Jessy Ping wrote: > > Hi Team, > > Is it possible to edit the job graph at runtime ? . Suppose, I want to > > add a new sink to the flink application at runtime that depends upon > > the specific parameters in the incoming events.Can i edit the jobgraph > > of a running flink application ? > > > > Thanks > > Jessy > >
Re: Editing job graph at runtime
Hi Team, Can you provide your thoughts on this, it will be helpful .. Thanks Jessy On Tue, 16 Mar 2021 at 21:29, Jessy Ping wrote: > Hi Timo/Team, > Thanks for the reply. > > Just take the example from the following pseduo code, > Suppose , this is the current application logic. > > firstInputStream = addSource(...)* //Kafka consumer C1* > secondInputStream = addSource(...) *//Kafka consumer C2* > > outputStream = firstInputStream,keyBy(a -> a.key) > .connect(secondInputStream.keyBy(b->b.key)) > .coProcessFunction() > * // logic determines : whether a new sink should be added to the > application or not ?. If not: then the event will be produced to the > existing sink(s). If a new sink is required: produce the events to the > existing sinks + the new one* > sink1 = addSink(outPutStream). //Kafka producer P1 > . > . > . > sinkN = addSink(outPutStream). //Kafka producer PN > > *Questions* > --> Can I add a new sink into the execution graph at runtime, for example > : a new Kafka producer , without restarting the current application or > using option1 ? > > --> (Option 2 )What do you mean by adding a custom sink at > coProcessFunction , how will it change the execution graph ? > > Thanks > Jessy > > > > On Tue, 16 Mar 2021 at 17:45, Timo Walther wrote: > >> Hi Jessy, >> >> to be precise, the JobGraph is not used at runtime. It is translated >> into an ExecutionGraph. >> >> But nevertheless such patterns are possible but require a bit of manual >> implementation. >> >> Option 1) You stop the job with a savepoint and restart the application >> with slightly different parameters. If the pipeline has not changed >> much, the old state can be remapped to the slightly modified job graph. >> This is the easiest solution but with the downside of maybe a couple of >> seconds downtime. >> >> Option 2) You introduce a dedicated control stream (i.e. by using the >> connect() DataStream API [1]). Either you implement a custom sink in the >> main stream of the CoProcessFunction. Or you enrich every record in the >> main stream with sink parameters that are read by you custom sink >> implementation. >> >> I hope this helps. >> >> Regards, >> Timo >> >> >> [1] >> >> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect >> >> On 16.03.21 12:37, Jessy Ping wrote: >> > Hi Team, >> > Is it possible to edit the job graph at runtime ? . Suppose, I want to >> > add a new sink to the flink application at runtime that depends upon >> > the specific parameters in the incoming events.Can i edit the jobgraph >> > of a running flink application ? >> > >> > Thanks >> > Jessy >> >>
Re: Editing job graph at runtime
Hi Jessy, Can I add a new sink into the execution graph at runtime, for example : a > new Kafka producer , without restarting the current application or using > option1 ? > No, there is no way to add a sink without restart currently. Could you elaborate why a restart is not an option for you? You can use Option 2, which means that you implement 1 source and 1 sink which will dynamically read from or write to different topics possibly by wrapping the existing source and sink. This is a rather complex task that I would not recommend to a new Flink user. If you have a known set of possible sink topics, another option would be to add all sinks from the go and only route messages dynamically with side-outputs. However, I'm not aware that such a pattern exists for sources. Although with the new source interface, it should be possible to do that. On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping wrote: > Hi Team, > > Can you provide your thoughts on this, it will be helpful .. > > Thanks > Jessy > > On Tue, 16 Mar 2021 at 21:29, Jessy Ping > wrote: > >> Hi Timo/Team, >> Thanks for the reply. >> >> Just take the example from the following pseduo code, >> Suppose , this is the current application logic. >> >> firstInputStream = addSource(...)* //Kafka consumer C1* >> secondInputStream = addSource(...) *//Kafka consumer C2* >> >> outputStream = firstInputStream,keyBy(a -> a.key) >> .connect(secondInputStream.keyBy(b->b.key)) >> .coProcessFunction() >> * // logic determines : whether a new sink should be added to the >> application or not ?. If not: then the event will be produced to the >> existing sink(s). If a new sink is required: produce the events to the >> existing sinks + the new one* >> sink1 = addSink(outPutStream). //Kafka producer P1 >> . >> . >> . >> sinkN = addSink(outPutStream). //Kafka producer PN >> >> *Questions* >> --> Can I add a new sink into the execution graph at runtime, for example >> : a new Kafka producer , without restarting the current application or >> using option1 ? >> >> --> (Option 2 )What do you mean by adding a custom sink at >> coProcessFunction , how will it change the execution graph ? >> >> Thanks >> Jessy >> >> >> >> On Tue, 16 Mar 2021 at 17:45, Timo Walther wrote: >> >>> Hi Jessy, >>> >>> to be precise, the JobGraph is not used at runtime. It is translated >>> into an ExecutionGraph. >>> >>> But nevertheless such patterns are possible but require a bit of manual >>> implementation. >>> >>> Option 1) You stop the job with a savepoint and restart the application >>> with slightly different parameters. If the pipeline has not changed >>> much, the old state can be remapped to the slightly modified job graph. >>> This is the easiest solution but with the downside of maybe a couple of >>> seconds downtime. >>> >>> Option 2) You introduce a dedicated control stream (i.e. by using the >>> connect() DataStream API [1]). Either you implement a custom sink in the >>> main stream of the CoProcessFunction. Or you enrich every record in the >>> main stream with sink parameters that are read by you custom sink >>> implementation. >>> >>> I hope this helps. >>> >>> Regards, >>> Timo >>> >>> >>> [1] >>> >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect >>> >>> On 16.03.21 12:37, Jessy Ping wrote: >>> > Hi Team, >>> > Is it possible to edit the job graph at runtime ? . Suppose, I want to >>> > add a new sink to the flink application at runtime that depends upon >>> > the specific parameters in the incoming events.Can i edit the >>> jobgraph >>> > of a running flink application ? >>> > >>> > Thanks >>> > Jessy >>> >>>
Re: Editing job graph at runtime
Thanks Arvid for the reply. Can you please elaborate a little bit on option 2 , if possible ? Thanks Jessy On Mon, Mar 22, 2021, 7:27 PM Arvid Heise wrote: > Hi Jessy, > > Can I add a new sink into the execution graph at runtime, for example : a >> new Kafka producer , without restarting the current application or using >> option1 ? >> > > No, there is no way to add a sink without restart currently. Could you > elaborate why a restart is not an option for you? > > You can use Option 2, which means that you implement 1 source and 1 sink > which will dynamically read from or write to different topics possibly by > wrapping the existing source and sink. This is a rather complex task that I > would not recommend to a new Flink user. > > If you have a known set of possible sink topics, another option would be > to add all sinks from the go and only route messages dynamically with > side-outputs. However, I'm not aware that such a pattern exists for > sources. Although with the new source interface, it should be possible to > do that. > > On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping > wrote: > >> Hi Team, >> >> Can you provide your thoughts on this, it will be helpful .. >> >> Thanks >> Jessy >> >> On Tue, 16 Mar 2021 at 21:29, Jessy Ping >> wrote: >> >>> Hi Timo/Team, >>> Thanks for the reply. >>> >>> Just take the example from the following pseduo code, >>> Suppose , this is the current application logic. >>> >>> firstInputStream = addSource(...)* //Kafka consumer C1* >>> secondInputStream = addSource(...) *//Kafka consumer C2* >>> >>> outputStream = firstInputStream,keyBy(a -> a.key) >>> .connect(secondInputStream.keyBy(b->b.key)) >>> .coProcessFunction() >>> * // logic determines : whether a new sink should be added to the >>> application or not ?. If not: then the event will be produced to the >>> existing sink(s). If a new sink is required: produce the events to the >>> existing sinks + the new one* >>> sink1 = addSink(outPutStream). //Kafka producer P1 >>> . >>> . >>> . >>> sinkN = addSink(outPutStream). //Kafka producer PN >>> >>> *Questions* >>> --> Can I add a new sink into the execution graph at runtime, for >>> example : a new Kafka producer , without restarting the current >>> application or using option1 ? >>> >>> --> (Option 2 )What do you mean by adding a custom sink at >>> coProcessFunction , how will it change the execution graph ? >>> >>> Thanks >>> Jessy >>> >>> >>> >>> On Tue, 16 Mar 2021 at 17:45, Timo Walther wrote: >>> >>>> Hi Jessy, >>>> >>>> to be precise, the JobGraph is not used at runtime. It is translated >>>> into an ExecutionGraph. >>>> >>>> But nevertheless such patterns are possible but require a bit of manual >>>> implementation. >>>> >>>> Option 1) You stop the job with a savepoint and restart the application >>>> with slightly different parameters. If the pipeline has not changed >>>> much, the old state can be remapped to the slightly modified job graph. >>>> This is the easiest solution but with the downside of maybe a couple of >>>> seconds downtime. >>>> >>>> Option 2) You introduce a dedicated control stream (i.e. by using the >>>> connect() DataStream API [1]). Either you implement a custom sink in >>>> the >>>> main stream of the CoProcessFunction. Or you enrich every record in the >>>> main stream with sink parameters that are read by you custom sink >>>> implementation. >>>> >>>> I hope this helps. >>>> >>>> Regards, >>>> Timo >>>> >>>> >>>> [1] >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect >>>> >>>> On 16.03.21 12:37, Jessy Ping wrote: >>>> > Hi Team, >>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want >>>> to >>>> > add a new sink to the flink application at runtime that depends upon >>>> > the specific parameters in the incoming events.Can i edit the >>>> jobgraph >>>> > of a running flink application ? >>>> > >>>> > Thanks >>>> > Jessy >>>> >>>>
Re: Editing job graph at runtime
Thanks Arvid for the reply. Can you please elaborate a little bit on option 2 , if possible ? We are looking for a similar option. Currently we are proceeding with option 1. Thanks Jessy for the question On Mon, Mar 22, 2021, 7:27 PM Arvid Heise wrote: > Hi Jessy, > > Can I add a new sink into the execution graph at runtime, for example : a >> new Kafka producer , without restarting the current application or using >> option1 ? >> > > No, there is no way to add a sink without restart currently. Could you > elaborate why a restart is not an option for you? > > You can use Option 2, which means that you implement 1 source and 1 sink > which will dynamically read from or write to different topics possibly by > wrapping the existing source and sink. This is a rather complex task that I > would not recommend to a new Flink user. > > If you have a known set of possible sink topics, another option would be > to add all sinks from the go and only route messages dynamically with > side-outputs. However, I'm not aware that such a pattern exists for > sources. Although with the new source interface, it should be possible to > do that. > > On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping > wrote: > >> Hi Team, >> >> Can you provide your thoughts on this, it will be helpful .. >> >> Thanks >> Jessy >> >> On Tue, 16 Mar 2021 at 21:29, Jessy Ping >> wrote: >> >>> Hi Timo/Team, >>> Thanks for the reply. >>> >>> Just take the example from the following pseduo code, >>> Suppose , this is the current application logic. >>> >>> firstInputStream = addSource(...)* //Kafka consumer C1* >>> secondInputStream = addSource(...) *//Kafka consumer C2* >>> >>> outputStream = firstInputStream,keyBy(a -> a.key) >>> .connect(secondInputStream.keyBy(b->b.key)) >>> .coProcessFunction() >>> * // logic determines : whether a new sink should be added to the >>> application or not ?. If not: then the event will be produced to the >>> existing sink(s). If a new sink is required: produce the events to the >>> existing sinks + the new one* >>> sink1 = addSink(outPutStream). //Kafka producer P1 >>> . >>> . >>> . >>> sinkN = addSink(outPutStream). //Kafka producer PN >>> >>> *Questions* >>> --> Can I add a new sink into the execution graph at runtime, for >>> example : a new Kafka producer , without restarting the current >>> application or using option1 ? >>> >>> --> (Option 2 )What do you mean by adding a custom sink at >>> coProcessFunction , how will it change the execution graph ? >>> >>> Thanks >>> Jessy >>> >>> >>> >>> On Tue, 16 Mar 2021 at 17:45, Timo Walther wrote: >>> >>>> Hi Jessy, >>>> >>>> to be precise, the JobGraph is not used at runtime. It is translated >>>> into an ExecutionGraph. >>>> >>>> But nevertheless such patterns are possible but require a bit of manual >>>> implementation. >>>> >>>> Option 1) You stop the job with a savepoint and restart the application >>>> with slightly different parameters. If the pipeline has not changed >>>> much, the old state can be remapped to the slightly modified job graph. >>>> This is the easiest solution but with the downside of maybe a couple of >>>> seconds downtime. >>>> >>>> Option 2) You introduce a dedicated control stream (i.e. by using the >>>> connect() DataStream API [1]). Either you implement a custom sink in >>>> the >>>> main stream of the CoProcessFunction. Or you enrich every record in the >>>> main stream with sink parameters that are read by you custom sink >>>> implementation. >>>> >>>> I hope this helps. >>>> >>>> Regards, >>>> Timo >>>> >>>> >>>> [1] >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect >>>> >>>> On 16.03.21 12:37, Jessy Ping wrote: >>>> > Hi Team, >>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want >>>> to >>>> > add a new sink to the flink application at runtime that depends upon >>>> > the specific parameters in the incoming events.Can i edit the >>>> jobgraph >>>> > of a running flink application ? >>>> > >>>> > Thanks >>>> > Jessy >>>> >>>>
Re: Editing job graph at runtime
Hi Arvid, Thanks for the reply. I am currently exploring the flink features and we have certain use cases where new producers will be added the system dynamically and we don't want to restart the application frequently. It will be helpful if you explain the option 2 in detail ? Thanks & Regards Jessy On Mon, 22 Mar 2021 at 19:27, Arvid Heise wrote: > Hi Jessy, > > Can I add a new sink into the execution graph at runtime, for example : a >> new Kafka producer , without restarting the current application or using >> option1 ? >> > > No, there is no way to add a sink without restart currently. Could you > elaborate why a restart is not an option for you? > > You can use Option 2, which means that you implement 1 source and 1 sink > which will dynamically read from or write to different topics possibly by > wrapping the existing source and sink. This is a rather complex task that I > would not recommend to a new Flink user. > > If you have a known set of possible sink topics, another option would be > to add all sinks from the go and only route messages dynamically with > side-outputs. However, I'm not aware that such a pattern exists for > sources. Although with the new source interface, it should be possible to > do that. > > On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping > wrote: > >> Hi Team, >> >> Can you provide your thoughts on this, it will be helpful .. >> >> Thanks >> Jessy >> >> On Tue, 16 Mar 2021 at 21:29, Jessy Ping >> wrote: >> >>> Hi Timo/Team, >>> Thanks for the reply. >>> >>> Just take the example from the following pseduo code, >>> Suppose , this is the current application logic. >>> >>> firstInputStream = addSource(...)* //Kafka consumer C1* >>> secondInputStream = addSource(...) *//Kafka consumer C2* >>> >>> outputStream = firstInputStream,keyBy(a -> a.key) >>> .connect(secondInputStream.keyBy(b->b.key)) >>> .coProcessFunction() >>> * // logic determines : whether a new sink should be added to the >>> application or not ?. If not: then the event will be produced to the >>> existing sink(s). If a new sink is required: produce the events to the >>> existing sinks + the new one* >>> sink1 = addSink(outPutStream). //Kafka producer P1 >>> . >>> . >>> . >>> sinkN = addSink(outPutStream). //Kafka producer PN >>> >>> *Questions* >>> --> Can I add a new sink into the execution graph at runtime, for >>> example : a new Kafka producer , without restarting the current >>> application or using option1 ? >>> >>> --> (Option 2 )What do you mean by adding a custom sink at >>> coProcessFunction , how will it change the execution graph ? >>> >>> Thanks >>> Jessy >>> >>> >>> >>> On Tue, 16 Mar 2021 at 17:45, Timo Walther wrote: >>> >>>> Hi Jessy, >>>> >>>> to be precise, the JobGraph is not used at runtime. It is translated >>>> into an ExecutionGraph. >>>> >>>> But nevertheless such patterns are possible but require a bit of manual >>>> implementation. >>>> >>>> Option 1) You stop the job with a savepoint and restart the application >>>> with slightly different parameters. If the pipeline has not changed >>>> much, the old state can be remapped to the slightly modified job graph. >>>> This is the easiest solution but with the downside of maybe a couple of >>>> seconds downtime. >>>> >>>> Option 2) You introduce a dedicated control stream (i.e. by using the >>>> connect() DataStream API [1]). Either you implement a custom sink in >>>> the >>>> main stream of the CoProcessFunction. Or you enrich every record in the >>>> main stream with sink parameters that are read by you custom sink >>>> implementation. >>>> >>>> I hope this helps. >>>> >>>> Regards, >>>> Timo >>>> >>>> >>>> [1] >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect >>>> >>>> On 16.03.21 12:37, Jessy Ping wrote: >>>> > Hi Team, >>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want >>>> to >>>> > add a new sink to the flink application at runtime that depends upon >>>> > the specific parameters in the incoming events.Can i edit the >>>> jobgraph >>>> > of a running flink application ? >>>> > >>>> > Thanks >>>> > Jessy >>>> >>>>
Re: Editing job graph at runtime
existing sinks + the new one* >>>> sink1 = addSink(outPutStream). //Kafka producer P1 >>>> . >>>> . >>>> . >>>> sinkN = addSink(outPutStream). //Kafka producer PN >>>> >>>> *Questions* >>>> --> Can I add a new sink into the execution graph at runtime, for >>>> example : a new Kafka producer , without restarting the current >>>> application or using option1 ? >>>> >>>> --> (Option 2 )What do you mean by adding a custom sink at >>>> coProcessFunction , how will it change the execution graph ? >>>> >>>> Thanks >>>> Jessy >>>> >>>> >>>> >>>> On Tue, 16 Mar 2021 at 17:45, Timo Walther wrote: >>>> >>>>> Hi Jessy, >>>>> >>>>> to be precise, the JobGraph is not used at runtime. It is translated >>>>> into an ExecutionGraph. >>>>> >>>>> But nevertheless such patterns are possible but require a bit of >>>>> manual >>>>> implementation. >>>>> >>>>> Option 1) You stop the job with a savepoint and restart the >>>>> application >>>>> with slightly different parameters. If the pipeline has not changed >>>>> much, the old state can be remapped to the slightly modified job >>>>> graph. >>>>> This is the easiest solution but with the downside of maybe a couple >>>>> of >>>>> seconds downtime. >>>>> >>>>> Option 2) You introduce a dedicated control stream (i.e. by using the >>>>> connect() DataStream API [1]). Either you implement a custom sink in >>>>> the >>>>> main stream of the CoProcessFunction. Or you enrich every record in >>>>> the >>>>> main stream with sink parameters that are read by you custom sink >>>>> implementation. >>>>> >>>>> I hope this helps. >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>>> >>>>> [1] >>>>> >>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect >>>>> >>>>> On 16.03.21 12:37, Jessy Ping wrote: >>>>> > Hi Team, >>>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want >>>>> to >>>>> > add a new sink to the flink application at runtime that depends upon >>>>> > the specific parameters in the incoming events.Can i edit the >>>>> jobgraph >>>>> > of a running flink application ? >>>>> > >>>>> > Thanks >>>>> > Jessy >>>>> >>>>>
Disable job graph in web UI
Hello, I have an auto-generated job that creates too many tasks for web UI’s job graph to handle. The browser pinwheels while the page attempts to load. Is it possible to disable the job graph component in the web UI? For slightly smaller jobs, once the graph loads the rest of the UI is usable. Thanks, Joshua
Re: failed when job graph change
Hi restart the job it's ok and i do that , but i must cancel the job and submit a new one and i dont want the data from the state forget to mention that i use the parameter "-allowNonRestoredState" my steps: 1. stop the job with savepoint 2. run the updated job ( update job graph) from savepoint expect it to run currently the result is the the job fail nick בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת Xuyang <xyzhong...@163.com>: > Hi, nick. > > > using savepoint i must cancel the job to be able run the new graph > > Do you mean that you need cancel and start the job using the new flink job > graph in 1.17.1, > and in the past, it was able to make the changes to the new operator > effective without restarting the job? > > I think in order for the new job graph to take effect, it is necessary to > restart the job. > > -- > Best! > Xuyang > > > At 2023-12-03 21:49:23, "nick toker" wrote: > > Hi > > when i add or remove an operator in the job graph , using savepoint i must > cancel the job to be able run the new graph > > e.g. by adding or removing operator (like new sink target) > it was working in the past > i using flink 1.17.1 > > 1. is it a known bug? if so when planned to be fix > > 2. do i need to do something to make it work? > > > nick > >
Re:Re: failed when job graph change
Hi, Can you attach the log about the exception when job failed? -- Best! Xuyang 在 2023-12-04 15:56:04,"nick toker" 写道: Hi restart the job it's ok and i do that , but i must cancel the job and submit a new one and i dont want the data from the state forget to mention that i use the parameter "-allowNonRestoredState" my steps: 1. stop the job with savepoint 2. run the updated job ( update job graph) from savepoint expect it to run currently the result is the the job fail nick בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת Xuyang <xyzhong...@163.com>: Hi, nick. > using savepoint i must cancel the job to be able run the new graph Do you mean that you need cancel and start the job using the new flink job graph in 1.17.1, and in the past, it was able to make the changes to the new operator effective without restarting the job? I think in order for the new job graph to take effect, it is necessary to restart the job. -- Best! Xuyang At 2023-12-03 21:49:23, "nick toker" wrote: Hi when i add or remove an operator in the job graph , using savepoint i must cancel the job to be able run the new graph e.g. by adding or removing operator (like new sink target) it was working in the past i using flink 1.17.1 1. is it a known bug? if so when planned to be fix 2. do i need to do something to make it work? nick
Re: Disable job graph in web UI
Upon further inspection, it appears that the web UI redraws each DOM element with every update. So I think removing the graph won’t fix the page performance issue because each task list item is being redrawn on every refresh. > On Sep 7, 2017, at 2:22 PM, Joshua Griffith wrote: > > Hello, I have an auto-generated job that creates too many tasks for web UI’s > job graph to handle. The browser pinwheels while the page attempts to load. > Is it possible to disable the job graph component in the web UI? For slightly > smaller jobs, once the graph loads the rest of the UI is usable. > > Thanks, > > Joshua
Re: Re: failed when job graph change
hi i didn't found anything in the log but i found that it happened when i add a new sink operator and because i work with checkpoints the flink can't finish the transaction ( the new topic in kafka not part of the transaction before i added the new sink operator) so i must cancel the job to make it work How can I solve this issue? nick בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-10:27 מאת Xuyang <xyzhong...@163.com>: > Hi, > Can you attach the log about the exception when job failed? > > -- > Best! > Xuyang > > > 在 2023-12-04 15:56:04,"nick toker" 写道: > > Hi > > restart the job it's ok and i do that , but i must cancel the job and > submit a new one and i dont want the data from the state > forget to mention that i use the parameter "-allowNonRestoredState" > > > my steps: > 1. stop the job with savepoint > 2. run the updated job ( update job graph) from savepoint > > expect it to run > > currently the result is the the job fail > > nick > > > > בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת Xuyang <xyzhong...@163.com > >: > >> Hi, nick. >> >> > using savepoint i must cancel the job to be able run the new graph >> >> Do you mean that you need cancel and start the job using the new flink >> job graph in 1.17.1, >> and in the past, it was able to make the changes to the new operator >> effective without restarting the job? >> >> I think in order for the new job graph to take effect, it is necessary to >> restart the job. >> >> -- >> Best! >> Xuyang >> >> >> At 2023-12-03 21:49:23, "nick toker" wrote: >> >> Hi >> >> when i add or remove an operator in the job graph , using savepoint i >> must cancel the job to be able run the new graph >> >> e.g. by adding or removing operator (like new sink target) >> it was working in the past >> i using flink 1.17.1 >> >> 1. is it a known bug? if so when planned to be fix >> >> 2. do i need to do something to make it work? >> >> >> nick >> >>
Re: Re: failed when job graph change
Hi nick If you want to modify the sink operator , I think you can modify the uid of the operator to avoid restoring the state that does not belong to it. Best, Feng On Thu, Jan 25, 2024 at 1:19 AM nick toker wrote: > hi > > i didn't found anything in the log > but i found that it happened when i add a new sink operator > and because i work with checkpoints the flink can't finish the > transaction ( the new topic in kafka not part of the transaction before i > added the new sink operator) > > so i must cancel the job to make it work > > How can I solve this issue? > > > nick > > בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-10:27 מאת Xuyang <xyzhong...@163.com > >: > >> Hi, >> Can you attach the log about the exception when job failed? >> >> -- >> Best! >> Xuyang >> >> >> 在 2023-12-04 15:56:04,"nick toker" 写道: >> >> Hi >> >> restart the job it's ok and i do that , but i must cancel the job and >> submit a new one and i dont want the data from the state >> forget to mention that i use the parameter "-allowNonRestoredState" >> >> >> my steps: >> 1. stop the job with savepoint >> 2. run the updated job ( update job graph) from savepoint >> >> expect it to run >> >> currently the result is the the job fail >> >> nick >> >> >> >> בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת Xuyang <xyzhong...@163.com >> >: >> >>> Hi, nick. >>> >>> > using savepoint i must cancel the job to be able run the new graph >>> >>> Do you mean that you need cancel and start the job using the new flink >>> job graph in 1.17.1, >>> and in the past, it was able to make the changes to the new operator >>> effective without restarting the job? >>> >>> I think in order for the new job graph to take effect, it is necessary >>> to restart the job. >>> >>> -- >>> Best! >>> Xuyang >>> >>> >>> At 2023-12-03 21:49:23, "nick toker" wrote: >>> >>> Hi >>> >>> when i add or remove an operator in the job graph , using savepoint i >>> must cancel the job to be able run the new graph >>> >>> e.g. by adding or removing operator (like new sink target) >>> it was working in the past >>> i using flink 1.17.1 >>> >>> 1. is it a known bug? if so when planned to be fix >>> >>> 2. do i need to do something to make it work? >>> >>> >>> nick >>> >>>
Re: Re: failed when job graph change
Hi i adding a new sink that was not exists in the graph 1. stop with save point 2. run the the new graph with the new sink operator ( from save point) in this case the job stuck in initializing forever because cant complete transaction ( on the new operator , kafka topic) i dont understand how change the uid will help its a new operator with new uid nick בתאריך יום ד׳, 24 בינו׳ 2024 ב-19:30 מאת Feng Jin < jinfeng1...@gmail.com>: > Hi nick > > If you want to modify the sink operator , I think you can modify the uid > of the operator to avoid restoring the state that does not belong to it. > > > Best, > Feng > > > On Thu, Jan 25, 2024 at 1:19 AM nick toker > wrote: > >> hi >> >> i didn't found anything in the log >> but i found that it happened when i add a new sink operator >> and because i work with checkpoints the flink can't finish the >> transaction ( the new topic in kafka not part of the transaction before i >> added the new sink operator) >> >> so i must cancel the job to make it work >> >> How can I solve this issue? >> >> >> nick >> >> בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-10:27 מאת Xuyang <xyzhong...@163.com >> >: >> >>> Hi, >>> Can you attach the log about the exception when job failed? >>> >>> -- >>> Best! >>> Xuyang >>> >>> >>> 在 2023-12-04 15:56:04,"nick toker" 写道: >>> >>> Hi >>> >>> restart the job it's ok and i do that , but i must cancel the job and >>> submit a new one and i dont want the data from the state >>> forget to mention that i use the parameter "-allowNonRestoredState" >>> >>> >>> my steps: >>> 1. stop the job with savepoint >>> 2. run the updated job ( update job graph) from savepoint >>> >>> expect it to run >>> >>> currently the result is the the job fail >>> >>> nick >>> >>> >>> >>> בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת Xuyang <xyzhong...@163.com >>> >: >>> >>>> Hi, nick. >>>> >>>> > using savepoint i must cancel the job to be able run the new graph >>>> >>>> Do you mean that you need cancel and start the job using the new flink >>>> job graph in 1.17.1, >>>> and in the past, it was able to make the changes to the new operator >>>> effective without restarting the job? >>>> >>>> I think in order for the new job graph to take effect, it is necessary >>>> to restart the job. >>>> >>>> -- >>>> Best! >>>> Xuyang >>>> >>>> >>>> At 2023-12-03 21:49:23, "nick toker" wrote: >>>> >>>> Hi >>>> >>>> when i add or remove an operator in the job graph , using savepoint i >>>> must cancel the job to be able run the new graph >>>> >>>> e.g. by adding or removing operator (like new sink target) >>>> it was working in the past >>>> i using flink 1.17.1 >>>> >>>> 1. is it a known bug? if so when planned to be fix >>>> >>>> 2. do i need to do something to make it work? >>>> >>>> >>>> nick >>>> >>>>
Documentation for translation of Job graph to Execution graph
Hi, When troubleshooting a flink job, it is tricky to map the Job graph (application code) to the logs & monitoring REST APIs. So, I am trying to find documentation on how a Job graph is translated to Execution graph. I found this - https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html Any detailed documentation on the design and code components will be helpful. Thanks, Abhi
Re: Documentation for translation of Job graph to Execution graph
Hi, I'm afraid there is no documentation besides the link that you posted and this one: https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html . With what parts are you having trouble? Maybe I can help. Cheers, Aljoscha On Thu, 16 Jun 2016 at 19:31 Bajaj, Abhinav wrote: > Hi, > > When troubleshooting a flink job, it is tricky to map the Job graph > (application code) to the logs & monitoring REST APIs. > > So, I am trying to find documentation on how a Job graph is translated to > Execution graph. > I found this - > https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html > > Any detailed documentation on the design and code components will be > helpful. > > Thanks, > Abhi >
Re: Documentation for translation of Job graph to Execution graph
Hi, Thanks for sharing this link. I have not see it before. May be this is newly added in 1.0 docs. I will go through it. In general, there are two things I am trying to understand and get comfortable with - 1. How a Job graph is translated to Execution graph. The logs and monitoring APIs are for the Execution graph. So, I need to map them to the Job graph. I am trying to bridge this gap. 2. The job manager & task manager logs are tricky to decipher. Especially when there are multiple jobs running. Is there a way to filter the logs for a single job ? ~ Abhi From: Aljoscha Krettek mailto:aljos...@apache.org>> Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Date: Friday, June 17, 2016 at 2:31 AM To: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Subject: Re: Documentation for translation of Job graph to Execution graph Hi, I'm afraid there is no documentation besides the link that you posted and this one: https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html. With what parts are you having trouble? Maybe I can help. Cheers, Aljoscha On Thu, 16 Jun 2016 at 19:31 Bajaj, Abhinav mailto:abhinav.ba...@here.com>> wrote: Hi, When troubleshooting a flink job, it is tricky to map the Job graph (application code) to the logs & monitoring REST APIs. So, I am trying to find documentation on how a Job graph is translated to Execution graph. I found this - https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html Any detailed documentation on the design and code components will be helpful. Thanks, Abhi
Re: Documentation for translation of Job graph to Execution graph
Hi, the link has been added newly, yes. Regarding Q1, since there is no documentation right now, I have to refer you to our code. In the JobManager.scala class, there is a method "private def submitJob(jobGraph, ...") where the ExecutionGraph is created. I think that's a good starting point for looking through the code. (I also added Till to the message if he wants to chime in) Q2: Currently, Flink doesn't add the job name to the logs, so its indeed not very easy to separate the log entries generated by different jobs. In general, we recommend running one JobManager per job (multiple jobs is of course also supported). On Sat, Jun 18, 2016 at 1:41 AM, Bajaj, Abhinav wrote: > Hi, > > Thanks for sharing this link. I have not see it before. May be this is > newly added in 1.0 docs. I will go through it. > > In general, there are two things I am trying to understand and get > comfortable with - > >1. How a Job graph is translated to Execution graph. The logs and >monitoring APIs are for the Execution graph. So, I need to map them to the >Job graph. I am trying to bridge this gap. >2. The job manager & task manager logs are tricky to decipher. >Especially when there are multiple jobs running. Is there a way to filter >the logs for a single job ? > > > ~ Abhi > > > From: Aljoscha Krettek > Reply-To: "user@flink.apache.org" > Date: Friday, June 17, 2016 at 2:31 AM > To: "user@flink.apache.org" > Subject: Re: Documentation for translation of Job graph to Execution graph > > Hi, > I'm afraid there is no documentation besides the link that you posted and > this one: > https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html > . > > With what parts are you having trouble? Maybe I can help. > > Cheers, > Aljoscha > > On Thu, 16 Jun 2016 at 19:31 Bajaj, Abhinav > wrote: > >> Hi, >> >> When troubleshooting a flink job, it is tricky to map the Job graph >> (application code) to the logs & monitoring REST APIs. >> >> So, I am trying to find documentation on how a Job graph is translated to >> Execution graph. >> I found this - >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html >> >> Any detailed documentation on the design and code components will be >> helpful. >> >> Thanks, >> Abhi >> >
Re: Documentation for translation of Job graph to Execution graph
Thanks Robert for helpful reply. I have follow up on the Q2 - "In general, we recommend running one JobManager per job” I understand how this can be achieved while running in Yarn, I.e. by submitting single Flink Jobs. Is their some other way of setting Flink to configure single Jobmanager per job ? From: Robert Metzger mailto:rmetz...@apache.org>> Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Date: Tuesday, June 21, 2016 at 8:23 AM To: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>>, Till Rohrmann mailto:trohrm...@apache.org>> Cc: Aljoscha Krettek mailto:aljos...@apache.org>> Subject: Re: Documentation for translation of Job graph to Execution graph Hi, the link has been added newly, yes. Regarding Q1, since there is no documentation right now, I have to refer you to our code. In the JobManager.scala class, there is a method "private def submitJob(jobGraph, ...") where the ExecutionGraph is created. I think that's a good starting point for looking through the code. (I also added Till to the message if he wants to chime in) Q2: Currently, Flink doesn't add the job name to the logs, so its indeed not very easy to separate the log entries generated by different jobs. In general, we recommend running one JobManager per job (multiple jobs is of course also supported). On Sat, Jun 18, 2016 at 1:41 AM, Bajaj, Abhinav mailto:abhinav.ba...@here.com>> wrote: Hi, Thanks for sharing this link. I have not see it before. May be this is newly added in 1.0 docs. I will go through it. In general, there are two things I am trying to understand and get comfortable with - 1. How a Job graph is translated to Execution graph. The logs and monitoring APIs are for the Execution graph. So, I need to map them to the Job graph. I am trying to bridge this gap. 2. The job manager & task manager logs are tricky to decipher. Especially when there are multiple jobs running. Is there a way to filter the logs for a single job ? ~ Abhi From: Aljoscha Krettek mailto:aljos...@apache.org>> Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Date: Friday, June 17, 2016 at 2:31 AM To: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Subject: Re: Documentation for translation of Job graph to Execution graph Hi, I'm afraid there is no documentation besides the link that you posted and this one: https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html. With what parts are you having trouble? Maybe I can help. Cheers, Aljoscha On Thu, 16 Jun 2016 at 19:31 Bajaj, Abhinav mailto:abhinav.ba...@here.com>> wrote: Hi, When troubleshooting a flink job, it is tricky to map the Job graph (application code) to the logs & monitoring REST APIs. So, I am trying to find documentation on how a Job graph is translated to Execution graph. I found this - https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html Any detailed documentation on the design and code components will be helpful. Thanks, Abhi
Re: Documentation for translation of Job graph to Execution graph
Hi Robert, Thanks for helpful reply. I have couple of follow up questions on your reply - "In general, we recommend running one JobManager per job” I understand how this can be achieved while running in Yarn, I.e. by submitting single Flink Jobs. Is their some other way of setting Flink to configure single Jobmanager per job ? Is their a plan to add the Job id or name to the logs ? Thanks, Abhi From: Robert Metzger mailto:rmetz...@apache.org>> Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Date: Tuesday, June 21, 2016 at 8:23 AM To: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>>, Till Rohrmann mailto:trohrm...@apache.org>> Cc: Aljoscha Krettek mailto:aljos...@apache.org>> Subject: Re: Documentation for translation of Job graph to Execution graph Hi, the link has been added newly, yes. Regarding Q1, since there is no documentation right now, I have to refer you to our code. In the JobManager.scala class, there is a method "private def submitJob(jobGraph, ...") where the ExecutionGraph is created. I think that's a good starting point for looking through the code. (I also added Till to the message if he wants to chime in) Q2: Currently, Flink doesn't add the job name to the logs, so its indeed not very easy to separate the log entries generated by different jobs. In general, we recommend running one JobManager per job (multiple jobs is of course also supported). On Sat, Jun 18, 2016 at 1:41 AM, Bajaj, Abhinav mailto:abhinav.ba...@here.com>> wrote: Hi, Thanks for sharing this link. I have not see it before. May be this is newly added in 1.0 docs. I will go through it. In general, there are two things I am trying to understand and get comfortable with - 1. How a Job graph is translated to Execution graph. The logs and monitoring APIs are for the Execution graph. So, I need to map them to the Job graph. I am trying to bridge this gap. 2. The job manager & task manager logs are tricky to decipher. Especially when there are multiple jobs running. Is there a way to filter the logs for a single job ? ~ Abhi From: Aljoscha Krettek mailto:aljos...@apache.org>> Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Date: Friday, June 17, 2016 at 2:31 AM To: "user@flink.apache.org<mailto:user@flink.apache.org>" mailto:user@flink.apache.org>> Subject: Re: Documentation for translation of Job graph to Execution graph Hi, I'm afraid there is no documentation besides the link that you posted and this one: https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html. With what parts are you having trouble? Maybe I can help. Cheers, Aljoscha On Thu, 16 Jun 2016 at 19:31 Bajaj, Abhinav mailto:abhinav.ba...@here.com>> wrote: Hi, When troubleshooting a flink job, it is tricky to map the Job graph (application code) to the logs & monitoring REST APIs. So, I am trying to find documentation on how a Job graph is translated to Execution graph. I found this - https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html Any detailed documentation on the design and code components will be helpful. Thanks, Abhi
Re: Documentation for translation of Job graph to Execution graph
On Wed, Jun 29, 2016 at 9:19 PM, Bajaj, Abhinav wrote: > Is their a plan to add the Job id or name to the logs ? This is now part of the YARN client output and should be part of the 1.1 release. Regarding your other question: in standalone mode, you have to manually make sure to not submit multiple jobs to a single cluster.
Recommendation for dealing with Job Graph incompatibility across varying Flink versions
Hello, We are exploring running multiple Flink clusters within a Kubernetes cluster such that each Flink cluster can run with a specified Flink image version. Since the Flink Job Graph needs to be compatible with the Flink version running in the Flink cluster, this brings a challenge in how we ensure that the SQL job graph or Flink job jars are compatible with the Flink cluster users want to run them on. E.g. if the Flink cluster is running version 1.12.1, the job graph generated from the SQL must be created using compatible 1.12.1 Flink libraries. Otherwise, we see issues with deserialization etc. Is there a recommended way to handle this scenario today? Thanks, Sonam
Job Manager taking long time to upload job graph on remote storage
Hi, We are currently running Flink 1.9.0. We see a delay of around 20 seconds in order to start a job on a session Flink cluster. We start the job using Flink's monitoring REST API where our jar is already uploaded on Job Manager. Our jar file size is around 200 MB. We are using memory state backend having GCS as remote storage. On running the cluster in debug mode, we observed that generating the plan itself takes around 6 seconds and copying job graph from local to the remote folder takes around 10 seconds. We were wondering whether this delay is expected or if it can be reduced via tweaking any configuration? Thank you. Regards Prakhar Mathur
Re: Recommendation for dealing with Job Graph incompatibility across varying Flink versions
Hi Sonam, I am not a long-standing Flink user (3 months only) so perhaps others will have a more authoritative view. I would say that I am using Flink in k8s, and have had some good success with the Google Flink operator (https://github.com/GoogleCloudPlatform/flink-on-k8s-operator). This includes Custom Resource Definitions (CRDs) so that you can define your Flink clusters in YAML, and deploy using kustomize. The result is: A Flink cluster of a job-manager and one-or-more task-managers. A Kubernetes job which acts as the link “client” to submit the job to the job-manager, the job-submitter e.g. flink-example-job-submitter-g4s6g 0/1 Completed 0 6d15h flink-example-jobmanager-0 1/1 Running 3 6d15h flink-example-taskmanager-0 1/1 Running 3 6d15h This all seems in keeping with Flink’s “Per Job-Mode” deployment option (https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/overview/#per-job-mode) Note: i’m only just getting into to state persistence and recovery, so still some work to do, but I think this is largely understanding and configuration. Hope that helps Paul > On 17 Jun 2021, at 23:55, Sonam Mandal wrote: > > Hello, > > We are exploring running multiple Flink clusters within a Kubernetes cluster > such that each Flink cluster can run with a specified Flink image version. > Since the Flink Job Graph needs to be compatible with the Flink version > running in the Flink cluster, this brings a challenge in how we ensure that > the SQL job graph or Flink job jars are compatible with the Flink cluster > users want to run them on. > > E.g. if the Flink cluster is running version 1.12.1, the job graph generated > from the SQL must be created using compatible 1.12.1 Flink libraries. > Otherwise, we see issues with deserialization etc. > > Is there a recommended way to handle this scenario today? > > Thanks, > Sonam
Re: Recommendation for dealing with Job Graph incompatibility across varying Flink versions
Hi Paul, Thanks for getting back to me. I did take a look at the Google GO operator, and they use the /bin/flink client for job submission. My understanding is that in this scenario users must ensure that their job jar is compatible with the Flink version, and the client will just take care of the submission. Do let me know if I understood this correctly or not. Perhaps some context on what we are doing will help. We are writing a small client which takes SQL statements and converts it to the Table environment and submits the job. The /bin/flink client does not directly take SQL out of the box and we cannot expect users to run a SQL shell to run their production SQL streaming services. Since we are dealing with the job graph generation ourselves, we have run into the issue where our client needs to be compiled with the same version of Flink that we are running, otherwise we run into job graph compatibility issues. So I wanted to understand if there is a recommendation on how to deal with this conversion in a scenario where different users may run different Flink versions in a given kubernetes cluster. Any thoughts? Thanks, Sonam From: Paul K Moore Sent: Friday, June 18, 2021 2:25:52 AM To: Sonam Mandal Cc: user@flink.apache.org ; Srinivasulu Punuru Subject: Re: Recommendation for dealing with Job Graph incompatibility across varying Flink versions Hi Sonam, I am not a long-standing Flink user (3 months only) so perhaps others will have a more authoritative view. I would say that I am using Flink in k8s, and have had some good success with the Google Flink operator (https://github.com/GoogleCloudPlatform/flink-on-k8s-operator<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FGoogleCloudPlatform%2Fflink-on-k8s-operator&data=04%7C01%7Csomandal%40linkedin.com%7C0a15c995b4d04585169908d9323b19fc%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637596051695453848%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=geelaXQ9JiFnVRZUK%2BsMdqU44i8yleWVZpE4q6JxYRE%3D&reserved=0>). This includes Custom Resource Definitions (CRDs) so that you can define your Flink clusters in YAML, and deploy using kustomize. The result is: A Flink cluster of a job-manager and one-or-more task-managers. A Kubernetes job which acts as the link “client” to submit the job to the job-manager, the job-submitter e.g. flink-example-job-submitter-g4s6g 0/1 Completed 0 6d15h flink-example-jobmanager-0 1/1 Running 3 6d15h flink-example-taskmanager-0 1/1 Running 3 6d15h This all seems in keeping with Flink’s “Per Job-Mode” deployment option (https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/overview/#per-job-mode<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.13%2Fdocs%2Fdeployment%2Foverview%2F%23per-job-mode&data=04%7C01%7Csomandal%40linkedin.com%7C0a15c995b4d04585169908d9323b19fc%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637596051695463844%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=8uqX5SHX%2FIneUXokKFuNQ8UE%2B0wMjzwkV%2FO8uB6OKVQ%3D&reserved=0>) Note: i’m only just getting into to state persistence and recovery, so still some work to do, but I think this is largely understanding and configuration. Hope that helps Paul On 17 Jun 2021, at 23:55, Sonam Mandal mailto:soman...@linkedin.com>> wrote: Hello, We are exploring running multiple Flink clusters within a Kubernetes cluster such that each Flink cluster can run with a specified Flink image version. Since the Flink Job Graph needs to be compatible with the Flink version running in the Flink cluster, this brings a challenge in how we ensure that the SQL job graph or Flink job jars are compatible with the Flink cluster users want to run them on. E.g. if the Flink cluster is running version 1.12.1, the job graph generated from the SQL must be created using compatible 1.12.1 Flink libraries. Otherwise, we see issues with deserialization etc. Is there a recommended way to handle this scenario today? Thanks, Sonam
Bugs in Streaming job stopping? Weird graceful stop/restart for disjoint job graph
Hi there, I'm currently analyzing a weird behavior of one of our jobs running on YARN with Flink 1.11.2. I have a kind of special situation here in that regard that I submit a single streaming job with a disjoint job graph, i.e. that job contains two graphs of the same kind but totally independent of each other (one having an ETL pipeline for source1, another for source2). It's just for historical reasons that makes deployment a bit easier. I had the job running nicely until I wanted to stop it with a savepoint as usual like so: flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS --yarnapplicationId=${FLINK_YARN_APPID} ${ID} After a minute, I receiveid a timeout exception [1]. Now the interesting part/possible bugs are the following 3: 1. The stop was triggered at 21/01/12 06:15:10. I see that almost all tasks of the entire job switched from RUNNING to FINISHED within seconds, but two tasks had something that looks like a racecondition on shutdown. They threw an IllegalStateException in BufferBuilder.append where an assertion makes sure that the buffer is not yet finished. [2] 2. That failure lead to RESTARTING the tasks of that job. So the failure occured 5 seconds after I triggered to stop the job. And 2 seconds later, I see that the pipeline switched it's state to RUNNING again. No wonder that the "stop" eventually stopped with a Timeout as the Job didn't think about shutting down anymore. 3. BUT the major issue for me here is: The entire pipeline of source1 was restarted, but the pipeline of source2 was still FINISHED. As Fink did quite some stuff with Batch/Streaming unification and region failover/restart in the last versions, my guess is that as I am in the special case of a disjoint graph here, only the tasks in the connected graph where the error occured restarted properly and the other graph was left in FINISHED state, even though I am dealing with a streaming job here. The problem is that the job was left in kind of a broken state: From just watching at YARN / Flink UI it seemed to be still running and the stop had no effect, but in reality, it shut down a huge part of the job. My workaround of course is as following: 1. If a "graceful stop" won't succeed, in future I will trigger a hard kill "yarn application -kill" afterwards because I can't be certain in what state the job is after a failed attempt to stop. 2. I will enforce stronger isolation in my jobs so that I always have connected graphs as jobs. In my case: I will deploy two independent jobs for the two ETL pipelines and hope that this problem won't arise again (At least, have the entire job either FINISHED or RUNNING). But I'm curious what you think: Are those 3 bugs or (some of it) kind of expected behaviour? Should I open bug ticket(s) for those? Best regards Theo [1] Timeout from flink stop: org.apache.flink.util.FlinkException: Could not stop with a savepoint job "f23290bf5fb0ecd49a4455e4a65f2eb6". at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864) at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493) ... 9 more [2] Exception in graceful shutdown: 2021-01-12T06:15:15.827877+01:00 WARN org.apache.flink.runtime.taskmanager.Task Source: rawdata_source1 -> validation_source1 -> enrich_source1 -> map_json_source1 -> Sink: write_to_kafka_source1) (3/18) (bc68320cf69dd877782417a3298499d6) switched from RUNNING to FAILED. java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) at org.apache.fl
Re: Job Manager taking long time to upload job graph on remote storage
Hi Prakhar, have you enabled HA for your cluster? If yes, then Flink will try to store the job graph to the configured high-availability.storageDir in order to be able to recover it. If this operation takes long, then it is either the filesystem which is slow or storing the pointer in ZooKeeper. If it is the filesystem, then I would suggest to check whether you have some read/write quotas which might slow the operation down. If you haven't enabled HA or persisting the jobGraph is not what takes long, then the next most likely candidate is the recovery from a previous checkpoint. Here again, Flink needs to read from the remote storage (in your case GCS). Depending on the size of the checkpoint and the read bandwidth, this can be faster or slower. The best way to figure out what takes long is to share the logs with us so that we can confirm what takes long. To sum it up, the job submission is most likely slow because of the interplay of Flink with the external system (most likely your configured filesystem). If the filesystem is somewhat throttled, then Flink cannot do much about it. What you could try to do is to check whether your jar contains dependencies which are not needed (e.g. Flink dependencies which are usually provided by the system). That way you could decrease the size of the jar a bit. Cheers, Till On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur wrote: > Hi, > > We are currently running Flink 1.9.0. We see a delay of around 20 seconds > in order to start a job on a session Flink cluster. We start the job using > Flink's monitoring REST API where our jar is already uploaded on Job > Manager. Our jar file size is around 200 MB. We are using memory state > backend having GCS as remote storage. > > On running the cluster in debug mode, we observed that generating the plan > itself takes around 6 seconds and copying job graph from local to the > remote folder takes around 10 seconds. > > We were wondering whether this delay is expected or if it can be reduced > via tweaking any configuration? > > Thank you. Regards > Prakhar Mathur >
Re: Job Manager taking long time to upload job graph on remote storage
Hi, Thanks for the response. Yes, we are running Flink in HA mode. We checked there are no such quota limits for GCS for us. Please find the logs below, here you can see the copying of blob started at 11:50:39,455 and it got JobGraph submission at 11:50:46,400. 2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded the idle timeout. 2020-09-01 11:50:37,061 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped. 2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded the idle timeout. 2020-09-01 11:50:37,062 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped. 2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger heartbeat request. 2020-09-01 11:50:37,305 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger heartbeat request. 2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Received heartbeat from 4e4ae8b90f911787ac112c2847759512. 2020-09-01 11:50:37,354 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. 2020-09-01 11:50:39,455 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore - Copying from /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 to gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426. 2020-09-01 11:50:43,904 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x30be3d929102460 after 2ms 2020-09-01 11:50:46,400 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). 2020-09-01 11:50:46,403 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). 2020-09-01 11:50:46,405 DEBUG org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0. 2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger heartbeat request. 2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger heartbeat request. 2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger heartbeat request. 2020-09-01 11:50:47,325 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger heartbeat request. 2020-09-01 11:50:47,330 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Received heartbeat from 4e4ae8b90f911787ac112c2847759512. 2020-09-01 11:50:47,331 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. 2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got notification sessionid:0x30be3d929102460 2020-09-01 11:50:52,880 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460 2020-09-01 11:50:52,882 INFO org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper. Thank You. On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann wrote: > Hi Prakhar, > > have you enabled HA for your cluster? If yes, then Flink will try to store > the job graph to the configured high-availability.storageDir in order to be > able to recover it. If this operation takes long, then it is either the > filesystem which is slow or storing the pointer in ZooKeeper. If it is the > filesystem, then I would suggest to check whether you have some read/write > quotas which might slow the operation down. > > If you haven't enabled HA or persisting the jobGraph is not what takes > long, then the next most likely candidate is the recovery from a previous > checkpoint. Here again, Flink needs to read from the remote storage (in > your case GCS). Depending on the size of the checkpoint and the read > bandwidth, this can be faster or slower. The best way to figure out what > takes long is to share
Re: Job Manager taking long time to upload job graph on remote storage
The logs don't look suspicious. Could you maybe check what the write bandwidth to your GCS bucket is from the machine you are running Flink on? It should be enough to generate a 200 MB file and write it to GCS. Thanks a lot for your help in debugging this matter. Cheers, Till On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur wrote: > Hi, > > Thanks for the response. Yes, we are running Flink in HA mode. We checked > there are no such quota limits for GCS for us. Please find the logs below, > here you can see the copying of blob started at 11:50:39,455 and it > got JobGraph submission at 11:50:46,400. > > 2020-09-01 11:50:37,061 DEBUG > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - > Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded > the idle timeout. > 2020-09-01 11:50:37,061 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped. > 2020-09-01 11:50:37,062 DEBUG > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - > Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded > the idle timeout. > 2020-09-01 11:50:37,062 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped. > 2020-09-01 11:50:37,305 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Trigger heartbeat request. > 2020-09-01 11:50:37,305 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Trigger heartbeat request. > 2020-09-01 11:50:37,354 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Received heartbeat from 4e4ae8b90f911787ac112c2847759512. > 2020-09-01 11:50:37,354 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. > 2020-09-01 11:50:39,455 DEBUG > org.apache.flink.runtime.blob.FileSystemBlobStore - Copying > from > /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 > to > gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426. > 2020-09-01 11:50:43,904 DEBUG > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got > ping response for sessionid: 0x30be3d929102460 after 2ms > 2020-09-01 11:50:46,400 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received > JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). > 2020-09-01 11:50:46,403 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting > job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). > 2020-09-01 11:50:46,405 DEBUG > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - > Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to > flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0. > 2020-09-01 11:50:47,325 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Trigger heartbeat request. > 2020-09-01 11:50:47,325 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Trigger heartbeat request. > 2020-09-01 11:50:47,325 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Trigger heartbeat request. > 2020-09-01 11:50:47,325 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Trigger heartbeat request. > 2020-09-01 11:50:47,330 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Received heartbeat from 4e4ae8b90f911787ac112c2847759512. > 2020-09-01 11:50:47,331 DEBUG > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - > Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. > 2020-09-01 11:50:52,880 DEBUG > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got > notification sessionid:0x30be3d929102460 > 2020-09-01 11:50:52,880 DEBUG > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got > WatchedEvent state:SyncConnected type:NodeChildrenChanged > path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460 > 2020-09-01 11:50:52,882 INFO > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - > Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper. > > Thank You. > > On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann wrote: > >> Hi Prakhar, >> >> have you enabled HA for your cluster? If yes, then Flink will try to >> store the job gra
Re: Job Manager taking long time to upload job graph on remote storage
We tried uploading the same blob from Job Manager k8s pod directly to GCS using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s. Thanks. On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann wrote: > The logs don't look suspicious. Could you maybe check what the write > bandwidth to your GCS bucket is from the machine you are running Flink on? > It should be enough to generate a 200 MB file and write it to GCS. Thanks a > lot for your help in debugging this matter. > > Cheers, > Till > > On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur wrote: > >> Hi, >> >> Thanks for the response. Yes, we are running Flink in HA mode. We checked >> there are no such quota limits for GCS for us. Please find the logs below, >> here you can see the copying of blob started at 11:50:39,455 and it >> got JobGraph submission at 11:50:46,400. >> >> 2020-09-01 11:50:37,061 DEBUG >> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded >> the idle timeout. >> 2020-09-01 11:50:37,061 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped. >> 2020-09-01 11:50:37,062 DEBUG >> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded >> the idle timeout. >> 2020-09-01 11:50:37,062 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped. >> 2020-09-01 11:50:37,305 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Trigger heartbeat request. >> 2020-09-01 11:50:37,305 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Trigger heartbeat request. >> 2020-09-01 11:50:37,354 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Received heartbeat from 4e4ae8b90f911787ac112c2847759512. >> 2020-09-01 11:50:37,354 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. >> 2020-09-01 11:50:39,455 DEBUG >> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying >> from >> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 >> to >> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426. >> 2020-09-01 11:50:43,904 DEBUG >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >> ping response for sessionid: 0x30be3d929102460 after 2ms >> 2020-09-01 11:50:46,400 INFO >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received >> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). >> 2020-09-01 11:50:46,403 INFO >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting >> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). >> 2020-09-01 11:50:46,405 DEBUG >> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - >> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to >> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0. >> 2020-09-01 11:50:47,325 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Trigger heartbeat request. >> 2020-09-01 11:50:47,325 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Trigger heartbeat request. >> 2020-09-01 11:50:47,325 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Trigger heartbeat request. >> 2020-09-01 11:50:47,325 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Trigger heartbeat request. >> 2020-09-01 11:50:47,330 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Received heartbeat from 4e4ae8b90f911787ac112c2847759512. >> 2020-09-01 11:50:47,331 DEBUG >> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. >> 2020-09-01 11:50:52,880 DEBUG >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >> notification sessionid:0x30be3d929102460 >> 2020-09-01 11:50:52,880 DEBUG >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCn
Re: Job Manager taking long time to upload job graph on remote storage
Hmm then it probably rules GCS out. What about ZooKeeper? Have you experienced slow response times from your ZooKeeper cluster? Cheers, Till On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur wrote: > We tried uploading the same blob from Job Manager k8s pod directly to GCS > using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s. > Thanks. > > On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann wrote: > >> The logs don't look suspicious. Could you maybe check what the write >> bandwidth to your GCS bucket is from the machine you are running Flink on? >> It should be enough to generate a 200 MB file and write it to GCS. Thanks a >> lot for your help in debugging this matter. >> >> Cheers, >> Till >> >> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur >> wrote: >> >>> Hi, >>> >>> Thanks for the response. Yes, we are running Flink in HA mode. We >>> checked there are no such quota limits for GCS for us. Please find the logs >>> below, here you can see the copying of blob started at 11:50:39,455 and it >>> got JobGraph submission at 11:50:46,400. >>> >>> 2020-09-01 11:50:37,061 DEBUG >>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded >>> the idle timeout. >>> 2020-09-01 11:50:37,061 DEBUG >>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped. >>> 2020-09-01 11:50:37,062 DEBUG >>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded >>> the idle timeout. >>> 2020-09-01 11:50:37,062 DEBUG >>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped. >>> 2020-09-01 11:50:37,305 DEBUG >>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Trigger heartbeat request. >>> 2020-09-01 11:50:37,305 DEBUG >>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Trigger heartbeat request. >>> 2020-09-01 11:50:37,354 DEBUG >>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512. >>> 2020-09-01 11:50:37,354 DEBUG >>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. >>> 2020-09-01 11:50:39,455 DEBUG >>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying >>> from >>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 >>> to >>> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426. >>> 2020-09-01 11:50:43,904 DEBUG >>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >>> ping response for sessionid: 0x30be3d929102460 after 2ms >>> 2020-09-01 11:50:46,400 INFO >>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received >>> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). >>> 2020-09-01 11:50:46,403 INFO >>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting >>> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). >>> 2020-09-01 11:50:46,405 DEBUG >>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - >>> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to >>> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0. >>> 2020-09-01 11:50:47,325 DEBUG >>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Trigger heartbeat request. >>> 2020-09-01 11:50:47,325 DEBUG >>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Trigger heartbeat request. >>> 2020-09-01 11:50:47,325 DEBUG >>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Trigger heartbeat request. >>> 2020-09-01 11:50:47,325 DEBUG >>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Trigger heartbeat request. >>> 2020-09-01 11:50:47,330 DEBUG >>> org.apache.flink.runtime.resourcemanager.Stand
Re: Job Manager taking long time to upload job graph on remote storage
Yes, I will check that, but any pointers on why Flink is taking more time than gsutil upload? On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann wrote: > Hmm then it probably rules GCS out. What about ZooKeeper? Have you > experienced slow response times from your ZooKeeper cluster? > > Cheers, > Till > > On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur wrote: > >> We tried uploading the same blob from Job Manager k8s pod directly to GCS >> using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s. >> Thanks. >> >> On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann >> wrote: >> >>> The logs don't look suspicious. Could you maybe check what the write >>> bandwidth to your GCS bucket is from the machine you are running Flink on? >>> It should be enough to generate a 200 MB file and write it to GCS. Thanks a >>> lot for your help in debugging this matter. >>> >>> Cheers, >>> Till >>> >>> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur >>> wrote: >>> >>>> Hi, >>>> >>>> Thanks for the response. Yes, we are running Flink in HA mode. We >>>> checked there are no such quota limits for GCS for us. Please find the logs >>>> below, here you can see the copying of blob started at 11:50:39,455 and it >>>> got JobGraph submission at 11:50:46,400. >>>> >>>> 2020-09-01 11:50:37,061 DEBUG >>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >>>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded >>>> the idle timeout. >>>> 2020-09-01 11:50:37,061 DEBUG >>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped. >>>> 2020-09-01 11:50:37,062 DEBUG >>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >>>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded >>>> the idle timeout. >>>> 2020-09-01 11:50:37,062 DEBUG >>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped. >>>> 2020-09-01 11:50:37,305 DEBUG >>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>> Trigger heartbeat request. >>>> 2020-09-01 11:50:37,305 DEBUG >>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>> Trigger heartbeat request. >>>> 2020-09-01 11:50:37,354 DEBUG >>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512. >>>> 2020-09-01 11:50:37,354 DEBUG >>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. >>>> 2020-09-01 11:50:39,455 DEBUG >>>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying >>>> from >>>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 >>>> to >>>> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426. >>>> 2020-09-01 11:50:43,904 DEBUG >>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >>>> ping response for sessionid: 0x30be3d929102460 after 2ms >>>> 2020-09-01 11:50:46,400 INFO >>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received >>>> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). >>>> 2020-09-01 11:50:46,403 INFO >>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting >>>> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). >>>> 2020-09-01 11:50:46,405 DEBUG >>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - >>>> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to >>>> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0. >>>> 2020-09-01 11:50:47,325 DEBUG >>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>> Trigger heartbeat request. >>>> 2020-09-01 11:50:47,325 DEBUG >>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - &
Re: Job Manager taking long time to upload job graph on remote storage
>From the log snippet it is hard to tell. Flink is not only interacting with GCS but also with ZooKeeper to store a pointer to the serialized JobGraph. This can also take some time. Then of course, there could be an issue with the GS filesystem implementation you are using. The fs throughput could also change a bit with time. In the logs you shared, uploading of the blobs takes at most 7s. In another run you stated that it would take 10s. Have you tried whether the same behaviour is observable with the latest Flink version? Cheers, Till On Fri, Sep 4, 2020 at 6:44 AM Prakhar Mathur wrote: > Yes, I will check that, but any pointers on why Flink is taking more time > than gsutil upload? > > On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann > wrote: > >> Hmm then it probably rules GCS out. What about ZooKeeper? Have you >> experienced slow response times from your ZooKeeper cluster? >> >> Cheers, >> Till >> >> On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur >> wrote: >> >>> We tried uploading the same blob from Job Manager k8s pod directly to >>> GCS using gsutils and it took 2 seconds. The upload speed was 166.8 >>> MiB/s. Thanks. >>> >>> On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann >>> wrote: >>> >>>> The logs don't look suspicious. Could you maybe check what the write >>>> bandwidth to your GCS bucket is from the machine you are running Flink on? >>>> It should be enough to generate a 200 MB file and write it to GCS. Thanks a >>>> lot for your help in debugging this matter. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Thanks for the response. Yes, we are running Flink in HA mode. We >>>>> checked there are no such quota limits for GCS for us. Please find the >>>>> logs >>>>> below, here you can see the copying of blob started at 11:50:39,455 and it >>>>> got JobGraph submission at 11:50:46,400. >>>>> >>>>> 2020-09-01 11:50:37,061 DEBUG >>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >>>>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded >>>>> the idle timeout. >>>>> 2020-09-01 11:50:37,061 DEBUG >>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped. >>>>> 2020-09-01 11:50:37,062 DEBUG >>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >>>>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded >>>>> the idle timeout. >>>>> 2020-09-01 11:50:37,062 DEBUG >>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped. >>>>> 2020-09-01 11:50:37,305 DEBUG >>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>> Trigger heartbeat request. >>>>> 2020-09-01 11:50:37,305 DEBUG >>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>> Trigger heartbeat request. >>>>> 2020-09-01 11:50:37,354 DEBUG >>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512. >>>>> 2020-09-01 11:50:37,354 DEBUG >>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. >>>>> 2020-09-01 11:50:39,455 DEBUG >>>>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying >>>>> from >>>>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 >>>>> to >>>>> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426. >>>>> 2020-09-01 11:50:43,904 DEBUG >>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >>>>> ping response for sessionid: 0x30be3d929102460 after 2ms >>>>> 2020-09-01 11:50:46,400 INFO >>>>> org.apache.flin
Re: Job Manager taking long time to upload job graph on remote storage
Yes, we can try the same in 1.11. Meanwhile is there any network or threads related config that we can tweak for this? On Fri, Sep 4, 2020 at 12:48 PM Till Rohrmann wrote: > From the log snippet it is hard to tell. Flink is not only interacting > with GCS but also with ZooKeeper to store a pointer to the serialized > JobGraph. This can also take some time. Then of course, there could be an > issue with the GS filesystem implementation you are using. The fs > throughput could also change a bit with time. In the logs you shared, > uploading of the blobs takes at most 7s. In another run you stated that it > would take 10s. > > Have you tried whether the same behaviour is observable with the latest > Flink version? > > Cheers, > Till > > On Fri, Sep 4, 2020 at 6:44 AM Prakhar Mathur wrote: > >> Yes, I will check that, but any pointers on why Flink is taking more time >> than gsutil upload? >> >> On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann >> wrote: >> >>> Hmm then it probably rules GCS out. What about ZooKeeper? Have you >>> experienced slow response times from your ZooKeeper cluster? >>> >>> Cheers, >>> Till >>> >>> On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur >>> wrote: >>> >>>> We tried uploading the same blob from Job Manager k8s pod directly to >>>> GCS using gsutils and it took 2 seconds. The upload speed was 166.8 >>>> MiB/s. Thanks. >>>> >>>> On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann >>>> wrote: >>>> >>>>> The logs don't look suspicious. Could you maybe check what the write >>>>> bandwidth to your GCS bucket is from the machine you are running Flink on? >>>>> It should be enough to generate a 200 MB file and write it to GCS. Thanks >>>>> a >>>>> lot for your help in debugging this matter. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Thanks for the response. Yes, we are running Flink in HA mode. We >>>>>> checked there are no such quota limits for GCS for us. Please find the >>>>>> logs >>>>>> below, here you can see the copying of blob started at 11:50:39,455 and >>>>>> it >>>>>> got JobGraph submission at 11:50:46,400. >>>>>> >>>>>> 2020-09-01 11:50:37,061 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >>>>>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded >>>>>> the idle timeout. >>>>>> 2020-09-01 11:50:37,061 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped. >>>>>> 2020-09-01 11:50:37,062 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >>>>>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded >>>>>> the idle timeout. >>>>>> 2020-09-01 11:50:37,062 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped. >>>>>> 2020-09-01 11:50:37,305 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Trigger heartbeat request. >>>>>> 2020-09-01 11:50:37,305 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Trigger heartbeat request. >>>>>> 2020-09-01 11:50:37,354 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512. >>>>>> 2020-09-01 11:50:37,354 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. >>>>>> 2020-09-01 11:50:39,455 DEBUG >>>>>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying >>>>>> from >>>>>> /tmp/flink-blobs/blobStore-6e4
Re: Job Manager taking long time to upload job graph on remote storage
me.resourcemanager.StandaloneResourceManager - >>>>>>> Trigger heartbeat request. >>>>>>> 2020-09-01 11:50:37,354 DEBUG >>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512. >>>>>>> 2020-09-01 11:50:37,354 DEBUG >>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. >>>>>>> 2020-09-01 11:50:39,455 DEBUG >>>>>>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying >>>>>>> from >>>>>>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 >>>>>>> to >>>>>>> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426. >>>>>>> 2020-09-01 11:50:43,904 DEBUG >>>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >>>>>>> ping response for sessionid: 0x30be3d929102460 after 2ms >>>>>>> 2020-09-01 11:50:46,400 INFO >>>>>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>>>> Received >>>>>>> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). >>>>>>> 2020-09-01 11:50:46,403 INFO >>>>>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>>>> Submitting >>>>>>> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). >>>>>>> 2020-09-01 11:50:46,405 DEBUG >>>>>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - >>>>>>> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to >>>>>>> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0. >>>>>>> 2020-09-01 11:50:47,325 DEBUG >>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>> Trigger heartbeat request. >>>>>>> 2020-09-01 11:50:47,325 DEBUG >>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>> Trigger heartbeat request. >>>>>>> 2020-09-01 11:50:47,325 DEBUG >>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>> Trigger heartbeat request. >>>>>>> 2020-09-01 11:50:47,325 DEBUG >>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>> Trigger heartbeat request. >>>>>>> 2020-09-01 11:50:47,330 DEBUG >>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512. >>>>>>> 2020-09-01 11:50:47,331 DEBUG >>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. >>>>>>> 2020-09-01 11:50:52,880 DEBUG >>>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >>>>>>> notification sessionid:0x30be3d929102460 >>>>>>> 2020-09-01 11:50:52,880 DEBUG >>>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >>>>>>> WatchedEvent state:SyncConnected type:NodeChildrenChanged >>>>>>> path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460 >>>>>>> 2020-09-01 11:50:52,882 INFO >>>>>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - >>>>>>> Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper. >>>>>>> >>>>>>> Thank You. >>>>>>> >>>>>>> On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Prakhar, >>>>>>>> >>>>>>>> have you enabled HA for your cluster? If yes, then Flink will try >>>>>>>> to store the job graph to the configured high-availability.storageDir >>>>>>>> in >>>>>>>> order to be able to recover it. If this operation takes long, then it >>>>>>>> is >>>>>>>> either the filesystem which is slow or storing the pointer in >>>>>>>> ZooKeeper. If >>>>>>>> it is the filesystem, then I would suggest to check whether you have >>>>>>>> some >>>>>>>> read/write quotas which might slow the operation down. >>>>>>>> >>>>>>>> If you haven't enabled HA or persisting the jobGraph is not what >>>>>>>> takes long, then the next most likely candidate is the recovery from a >>>>>>>> previous checkpoint. Here again, Flink needs to read from the remote >>>>>>>> storage (in your case GCS). Depending on the size of the checkpoint >>>>>>>> and the >>>>>>>> read bandwidth, this can be faster or slower. The best way to figure >>>>>>>> out >>>>>>>> what takes long is to share the logs with us so that we can confirm >>>>>>>> what >>>>>>>> takes long. >>>>>>>> >>>>>>>> To sum it up, the job submission is most likely slow because of the >>>>>>>> interplay of Flink with the external system (most likely your >>>>>>>> configured >>>>>>>> filesystem). If the filesystem is somewhat throttled, then Flink >>>>>>>> cannot do >>>>>>>> much about it. >>>>>>>> >>>>>>>> What you could try to do is to check whether your jar contains >>>>>>>> dependencies which are not needed (e.g. Flink dependencies which are >>>>>>>> usually provided by the system). That way you could decrease the size >>>>>>>> of >>>>>>>> the jar a bit. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> We are currently running Flink 1.9.0. We see a delay of around 20 >>>>>>>>> seconds in order to start a job on a session Flink cluster. We start >>>>>>>>> the >>>>>>>>> job using Flink's monitoring REST API where our jar is already >>>>>>>>> uploaded on >>>>>>>>> Job Manager. Our jar file size is around 200 MB. We are using memory >>>>>>>>> state >>>>>>>>> backend having GCS as remote storage. >>>>>>>>> >>>>>>>>> On running the cluster in debug mode, we observed that generating >>>>>>>>> the plan itself takes around 6 seconds and copying job graph from >>>>>>>>> local to >>>>>>>>> the remote folder takes around 10 seconds. >>>>>>>>> >>>>>>>>> We were wondering whether this delay is expected or if it can be >>>>>>>>> reduced via tweaking any configuration? >>>>>>>>> >>>>>>>>> Thank you. Regards >>>>>>>>> Prakhar Mathur >>>>>>>>> >>>>>>>>
Re: Bugs in Streaming job stopping? Weird graceful stop/restart for disjoint job graph
Hi Theo, thank you for reporting. I think all three issues are indeed bugs and it would be awesome if you could create tickets for them. For the restarting issue, could you please add whether the savepoint created successfully? Best, Arvid On Wed, Jan 13, 2021 at 12:56 PM Theo Diefenthal < theo.diefent...@scoop-software.de> wrote: > Hi there, > > I'm currently analyzing a weird behavior of one of our jobs running on > YARN with Flink 1.11.2. I have a kind of special situation here in that > regard that I submit a single streaming job with a disjoint job graph, i.e. > that job contains two graphs of the same kind but totally independent of > each other (one having an ETL pipeline for source1, another for source2). > It's just for historical reasons that makes deployment a bit easier. > > I had the job running nicely until I wanted to stop it with a savepoint as > usual like so: > flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS > --yarnapplicationId=${FLINK_YARN_APPID} ${ID} > > After a minute, I receiveid a timeout exception [1]. > > Now the interesting part/possible bugs are the following 3: > 1. The stop was triggered at 21/01/12 06:15:10. I see that almost all > tasks of the entire job switched from RUNNING to FINISHED within seconds, > but two tasks had something that looks like a racecondition on shutdown. > They threw an IllegalStateException in BufferBuilder.append where an > assertion makes sure that the buffer is not yet finished. [2] > 2. That failure lead to RESTARTING the tasks of that job. So the failure > occured 5 seconds after I triggered to stop the job. And 2 seconds later, I > see that the pipeline switched it's state to RUNNING again. No wonder that > the "stop" eventually stopped with a Timeout as the Job didn't think about > shutting down anymore. > 3. BUT the major issue for me here is: The entire pipeline of source1 was > restarted, but the pipeline of source2 was still FINISHED. As Fink did > quite some stuff with Batch/Streaming unification and region > failover/restart in the last versions, my guess is that as I am in the > special case of a disjoint graph here, only the tasks in the connected > graph where the error occured restarted properly and the other graph was > left in FINISHED state, even though I am dealing with a streaming job here. > > The problem is that the job was left in kind of a broken state: From just > watching at YARN / Flink UI it seemed to be still running and the stop had > no effect, but in reality, it shut down a huge part of the job. My > workaround of course is as following: > 1. If a "graceful stop" won't succeed, in future I will trigger a hard > kill "yarn application -kill" afterwards because I can't be certain in what > state the job is after a failed attempt to stop. > 2. I will enforce stronger isolation in my jobs so that I always have > connected graphs as jobs. In my case: I will deploy two independent jobs > for the two ETL pipelines and hope that this problem won't arise again (At > least, have the entire job either FINISHED or RUNNING). > > But I'm curious what you think: Are those 3 bugs or (some of it) kind of > expected behaviour? Should I open bug ticket(s) for those? > > Best regards > Theo > > > > [1] Timeout from flink stop: > > org.apache.flink.util.FlinkException: Could not stop with a savepoint job > "f23290bf5fb0ecd49a4455e4a65f2eb6". >at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495) >at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864) >at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487) >at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931) >at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) >at java.security.AccessController.doPrivileged(Native Method) >at javax.security.auth.Subject.doAs(Subject.java:422) >at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) >at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > Caused by: java.util.concurrent.TimeoutException >at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493) >... 9 more > > [2] Exception in graceful s
Re: Is there a way to control the parallelism of auto-generated Flink operators of the FlinkSQL job graph?
Hi, Elkhan, I think this is an intended behavior. If the parallelism of an operator is not specified, it will be the same as the previous one instead of the default parallelism. Actually the table planner will help us to do most jobs. There should not be a way to modify the parallelism for every operator. After all we don't know what operators will be contained when we write the sql. Best, Hang Elkhan Dadashov 于2023年3月24日周五 14:14写道: > Checking with the community again, if anyone explored this before. > > Thanks. > > > On Fri, Mar 17, 2023 at 1:56 PM Elkhan Dadashov > > wrote: > > > Dear Flink developers, > > > > Wanted to check, if there is a way to control the parallelism of > > auto-generated Flink operators of the FlinkSQL job graph? > > > > In Java API, it is possible to have full control of the parallelism of > > each operator. > > > > On FlinkSQL some source and sink connectors support `source.parallelism` > > and `sink.parallelism`, and the rest can be set via > `default.parallelism`. > > > > In this particular scenario, enchancedEvents gets chained to the > > KafkaSource operator, it can be separated by calling disableChain() on > > KafkaSource stream on Kafka connector side, but even with disabled > > chaining on the source stream, `enhancedEvents` operator parallelism is > > still set to 5 (same as Kafka Source operator parallelism), instead of 3 > > (which is default parallelism) : > > > > ```sql > > SET 'parallelism.default' = '3'; > > > > CREATE TABLE input_kafka_table > > ( > > ... > > ts AS TO_TIMESTAMP_LTZ(CAST(`timestamp` AS BIGINT),3), > > WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE > > ) WITH ( > > 'connector' = 'kafka', > > 'source.parallelism' = '5' // this is supported by cutomization of > > kafka connector > > ... > > ); > > > > CREATE TEMPORARY VIEW enhancedEvents AS ( > > SELECT x, y > > FROM input_kafka_table, LATERAL TABLE(udf.doIt(x, y) > > ); > > > > CREATE TABLE other_table_source (...) WITH(...); > > CREATE TABLE other_table_sink (...) WITH(...); > > > > BEGIN STATEMENT SET; > > INSERT into enhancedEventsSink (Select * from enhancedEvents); > > INSERT into other_table_sink (Select z from other_table_source ); > > END; > > ``` > > > > Is there a way to force override parallelism of auto-generated operators > > for FlinkSQL pipeline? > > > > Or is this expected behavior of some operator's parallelism not assigned > > from default parallelism but from another operator's parallelism? > > > > Want to understand if this is a bug or intended behavior. > > > > Thank you. > > > > >