Job graph

2023-08-30 Thread Nikolaos Paraskakis
Hello folks,

I am trying to get the job graph of a running flink job. I want to use flink 
libraries. For now, I have the RestClusterClient and the job IDs. Tell me 
please how to get the job graph.

Thank you.

Re: Job graph

2023-08-30 Thread liu ron
Hi, Nikolaos

As far as I know, JobGraph is a relatively low-level concept, and currently
we don't expose it directly to users, and don't provide a direct Restful
API to get it from JobManager. Why do you need to get JobGraph and what is
your real need?

Best,
Ron

Nikolaos Paraskakis  于2023年8月31日周四 01:13写道:

> Hello folks,
>
> I am trying to get the job graph of a running flink job. I want to use
> flink libraries. For now, I have the RestClusterClient and the job IDs.
> Tell me please how to get the job graph.
>
> Thank you.


Re: Job graph

2023-09-01 Thread David Anderson
This may or may not help, but you can get the execution plan from
inside the client, by doing something like this (I printed the plan to
stderr):

...
System.err.println(env.getExecutionPlan());
env.execute("my job");

The result is a JSON-encoded representation of the job graph, which
for the simple example I just tried it with, produced this output:

{
  "nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 10
  }, {
"id" : 3,
"type" : "Sink: Writer",
"pact" : "Operator",
"contents" : "Sink: Writer",
"parallelism" : 10,
"predecessors" : [ {
  "id" : 1,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  }, {
"id" : 5,
"type" : "Sink: Committer",
"pact" : "Operator",
"contents" : "Sink: Committer",
"parallelism" : 10,
"predecessors" : [ {
  "id" : 3,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  } ]
}

On Wed, Aug 30, 2023 at 7:01 AM Nikolaos Paraskakis
 wrote:
>
> Hello folks,
>
> I am trying to get the job graph of a running flink job. I want to use flink 
> libraries. For now, I have the RestClusterClient and the job IDs. Tell me 
> please how to get the job graph.
>
> Thank you.


Re: Job graph

2023-09-04 Thread Shammon FY
Hi Nikolaos,

As Ron said, the jobgraph is a low level structure in flink and it is not
exposed to users now. Currently you can get job details from
`RestClusterClient` in method `getJobDetails(JobID jobId)`, the result
`JobDetailsInfo` contains all vertices in the job and the json format job
plan.

Best,
Shammon FY

On Sat, Sep 2, 2023 at 6:26 AM David Anderson  wrote:

> This may or may not help, but you can get the execution plan from
> inside the client, by doing something like this (I printed the plan to
> stderr):
>
> ...
> System.err.println(env.getExecutionPlan());
> env.execute("my job");
>
> The result is a JSON-encoded representation of the job graph, which
> for the simple example I just tried it with, produced this output:
>
> {
>   "nodes" : [ {
> "id" : 1,
> "type" : "Source: Custom Source",
> "pact" : "Data Source",
> "contents" : "Source: Custom Source",
> "parallelism" : 10
>   }, {
> "id" : 3,
> "type" : "Sink: Writer",
> "pact" : "Operator",
> "contents" : "Sink: Writer",
> "parallelism" : 10,
> "predecessors" : [ {
>   "id" : 1,
>   "ship_strategy" : "FORWARD",
>   "side" : "second"
> } ]
>   }, {
> "id" : 5,
> "type" : "Sink: Committer",
> "pact" : "Operator",
> "contents" : "Sink: Committer",
> "parallelism" : 10,
> "predecessors" : [ {
>   "id" : 3,
>   "ship_strategy" : "FORWARD",
>   "side" : "second"
> } ]
>   } ]
> }
>
> On Wed, Aug 30, 2023 at 7:01 AM Nikolaos Paraskakis
>  wrote:
> >
> > Hello folks,
> >
> > I am trying to get the job graph of a running flink job. I want to use
> flink libraries. For now, I have the RestClusterClient and the job IDs.
> Tell me please how to get the job graph.
> >
> > Thank you.
>


Editing job graph at runtime

2021-03-16 Thread Jessy Ping
Hi Team,
Is it possible to edit the job graph at runtime ? . Suppose, I want to add
a new sink to the flink application at runtime that depends upon the
specific parameters in the incoming events.Can i edit the jobgraph of a
running flink application ?

Thanks
Jessy


failed when job graph change

2023-12-03 Thread nick toker
Hi

when i add or remove an operator in the job graph , using savepoint i must
cancel the job to be able run the new graph

e.g. by adding or removing operator (like new sink target)
it was working in the past
i using flink 1.17.1

1. is it a known bug? if so when planned to be fix

2. do i need to do something to make it work?


nick


Re:failed when job graph change

2023-12-03 Thread Xuyang
Hi, nick.


> using savepoint i must cancel the job to be able run the new graph


Do you mean that you need cancel and start the job using the new flink job 
graph in 1.17.1, 
and in the past, it was able to make the changes to the new operator effective 
without restarting the job?


I think in order for the new job graph to take effect, it is necessary to 
restart the job.



--

Best!
Xuyang




At 2023-12-03 21:49:23, "nick toker"  wrote:

Hi


when i add or remove an operator in the job graph , using savepoint i must 
cancel the job to be able run the new graph


e.g. by adding or removing operator (like new sink target)

it was working in the past

i using flink 1.17.1


1. is it a known bug? if so when planned to be fix


2. do i need to do something to make it work?





nick




Re: Editing job graph at runtime

2021-03-16 Thread Timo Walther

Hi Jessy,

to be precise, the JobGraph is not used at runtime. It is translated 
into an ExecutionGraph.


But nevertheless such patterns are possible but require a bit of manual 
implementation.


Option 1) You stop the job with a savepoint and restart the application 
with slightly different parameters. If the pipeline has not changed 
much, the old state can be remapped to the slightly modified job graph. 
This is the easiest solution but with the downside of maybe a couple of 
seconds downtime.


Option 2) You introduce a dedicated control stream (i.e. by using the 
connect() DataStream API [1]). Either you implement a custom sink in the 
main stream of the CoProcessFunction. Or you enrich every record in the 
main stream with sink parameters that are read by you custom sink 
implementation.


I hope this helps.

Regards,
Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect


On 16.03.21 12:37, Jessy Ping wrote:

Hi Team,
Is it possible to edit the job graph at runtime ? . Suppose, I want to 
add a new sink to the flink application at runtime that depends upon 
the  specific parameters in the incoming events.Can i edit the jobgraph 
of a running flink application ?


Thanks
Jessy




Re: Editing job graph at runtime

2021-03-16 Thread Jessy Ping
Hi Timo/Team,
Thanks for the reply.

Just take the example from the following pseduo code,
Suppose , this is the current application logic.

firstInputStream = addSource(...)* //Kafka consumer C1*
secondInputStream =  addSource(...) *//Kafka consumer C2*

outputStream = firstInputStream,keyBy(a -> a.key)
.connect(secondInputStream.keyBy(b->b.key))
.coProcessFunction()
* // logic determines : whether a new sink should be added to the
application or not ?. If not: then the event will be produced to the
existing sink(s). If a new sink is required: produce the events to the
existing sinks + the new one*
sink1 = addSink(outPutStream). //Kafka producer P1
.
.
.
sinkN =  addSink(outPutStream). //Kafka producer PN

*Questions*
--> Can I add a new sink into the execution graph at runtime, for example :
a new Kafka producer , without restarting the current application  or using
option1 ?

-->  (Option 2 )What do you mean by adding a custom sink at
coProcessFunction , how will it change the execution graph ?

Thanks
Jessy



On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:

> Hi Jessy,
>
> to be precise, the JobGraph is not used at runtime. It is translated
> into an ExecutionGraph.
>
> But nevertheless such patterns are possible but require a bit of manual
> implementation.
>
> Option 1) You stop the job with a savepoint and restart the application
> with slightly different parameters. If the pipeline has not changed
> much, the old state can be remapped to the slightly modified job graph.
> This is the easiest solution but with the downside of maybe a couple of
> seconds downtime.
>
> Option 2) You introduce a dedicated control stream (i.e. by using the
> connect() DataStream API [1]). Either you implement a custom sink in the
> main stream of the CoProcessFunction. Or you enrich every record in the
> main stream with sink parameters that are read by you custom sink
> implementation.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>
> On 16.03.21 12:37, Jessy Ping wrote:
> > Hi Team,
> > Is it possible to edit the job graph at runtime ? . Suppose, I want to
> > add a new sink to the flink application at runtime that depends upon
> > the  specific parameters in the incoming events.Can i edit the jobgraph
> > of a running flink application ?
> >
> > Thanks
> > Jessy
>
>


Re: Editing job graph at runtime

2021-03-16 Thread Jessy Ping
Hi Team,

Can you provide your thoughts on this, it will be helpful ..

Thanks
Jessy

On Tue, 16 Mar 2021 at 21:29, Jessy Ping  wrote:

> Hi Timo/Team,
> Thanks for the reply.
>
> Just take the example from the following pseduo code,
> Suppose , this is the current application logic.
>
> firstInputStream = addSource(...)* //Kafka consumer C1*
> secondInputStream =  addSource(...) *//Kafka consumer C2*
>
> outputStream = firstInputStream,keyBy(a -> a.key)
> .connect(secondInputStream.keyBy(b->b.key))
> .coProcessFunction()
> * // logic determines : whether a new sink should be added to the
> application or not ?. If not: then the event will be produced to the
> existing sink(s). If a new sink is required: produce the events to the
> existing sinks + the new one*
> sink1 = addSink(outPutStream). //Kafka producer P1
> .
> .
> .
> sinkN =  addSink(outPutStream). //Kafka producer PN
>
> *Questions*
> --> Can I add a new sink into the execution graph at runtime, for example
> : a new Kafka producer , without restarting the current application  or
> using option1 ?
>
> -->  (Option 2 )What do you mean by adding a custom sink at
> coProcessFunction , how will it change the execution graph ?
>
> Thanks
> Jessy
>
>
>
> On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:
>
>> Hi Jessy,
>>
>> to be precise, the JobGraph is not used at runtime. It is translated
>> into an ExecutionGraph.
>>
>> But nevertheless such patterns are possible but require a bit of manual
>> implementation.
>>
>> Option 1) You stop the job with a savepoint and restart the application
>> with slightly different parameters. If the pipeline has not changed
>> much, the old state can be remapped to the slightly modified job graph.
>> This is the easiest solution but with the downside of maybe a couple of
>> seconds downtime.
>>
>> Option 2) You introduce a dedicated control stream (i.e. by using the
>> connect() DataStream API [1]). Either you implement a custom sink in the
>> main stream of the CoProcessFunction. Or you enrich every record in the
>> main stream with sink parameters that are read by you custom sink
>> implementation.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>>
>> On 16.03.21 12:37, Jessy Ping wrote:
>> > Hi Team,
>> > Is it possible to edit the job graph at runtime ? . Suppose, I want to
>> > add a new sink to the flink application at runtime that depends upon
>> > the  specific parameters in the incoming events.Can i edit the jobgraph
>> > of a running flink application ?
>> >
>> > Thanks
>> > Jessy
>>
>>


Re: Editing job graph at runtime

2021-03-22 Thread Arvid Heise
Hi Jessy,

Can I add a new sink into the execution graph at runtime, for example : a
> new Kafka producer , without restarting the current application  or using
> option1 ?
>

No, there is no way to add a sink without restart currently. Could you
elaborate why a restart is not an option for you?

You can use Option 2, which means that you implement 1 source and 1 sink
which will dynamically read from or write to different topics possibly by
wrapping the existing source and sink. This is a rather complex task that I
would not recommend to a new Flink user.

If you have a known set of possible sink topics, another option would be to
add all sinks from the go and only route messages dynamically with
side-outputs. However, I'm not aware that such a pattern exists for
sources. Although with the new source interface, it should be possible to
do that.

On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping 
wrote:

> Hi Team,
>
> Can you provide your thoughts on this, it will be helpful ..
>
> Thanks
> Jessy
>
> On Tue, 16 Mar 2021 at 21:29, Jessy Ping 
> wrote:
>
>> Hi Timo/Team,
>> Thanks for the reply.
>>
>> Just take the example from the following pseduo code,
>> Suppose , this is the current application logic.
>>
>> firstInputStream = addSource(...)* //Kafka consumer C1*
>> secondInputStream =  addSource(...) *//Kafka consumer C2*
>>
>> outputStream = firstInputStream,keyBy(a -> a.key)
>> .connect(secondInputStream.keyBy(b->b.key))
>> .coProcessFunction()
>> * // logic determines : whether a new sink should be added to the
>> application or not ?. If not: then the event will be produced to the
>> existing sink(s). If a new sink is required: produce the events to the
>> existing sinks + the new one*
>> sink1 = addSink(outPutStream). //Kafka producer P1
>> .
>> .
>> .
>> sinkN =  addSink(outPutStream). //Kafka producer PN
>>
>> *Questions*
>> --> Can I add a new sink into the execution graph at runtime, for example
>> : a new Kafka producer , without restarting the current application  or
>> using option1 ?
>>
>> -->  (Option 2 )What do you mean by adding a custom sink at
>> coProcessFunction , how will it change the execution graph ?
>>
>> Thanks
>> Jessy
>>
>>
>>
>> On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:
>>
>>> Hi Jessy,
>>>
>>> to be precise, the JobGraph is not used at runtime. It is translated
>>> into an ExecutionGraph.
>>>
>>> But nevertheless such patterns are possible but require a bit of manual
>>> implementation.
>>>
>>> Option 1) You stop the job with a savepoint and restart the application
>>> with slightly different parameters. If the pipeline has not changed
>>> much, the old state can be remapped to the slightly modified job graph.
>>> This is the easiest solution but with the downside of maybe a couple of
>>> seconds downtime.
>>>
>>> Option 2) You introduce a dedicated control stream (i.e. by using the
>>> connect() DataStream API [1]). Either you implement a custom sink in the
>>> main stream of the CoProcessFunction. Or you enrich every record in the
>>> main stream with sink parameters that are read by you custom sink
>>> implementation.
>>>
>>> I hope this helps.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>>>
>>> On 16.03.21 12:37, Jessy Ping wrote:
>>> > Hi Team,
>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want to
>>> > add a new sink to the flink application at runtime that depends upon
>>> > the  specific parameters in the incoming events.Can i edit the
>>> jobgraph
>>> > of a running flink application ?
>>> >
>>> > Thanks
>>> > Jessy
>>>
>>>


Re: Editing job graph at runtime

2021-03-22 Thread Ejaskhan S
Thanks Arvid for the reply.

Can you please elaborate a little bit on option 2 , if possible ?

Thanks
Jessy

On Mon, Mar 22, 2021, 7:27 PM Arvid Heise  wrote:

> Hi Jessy,
>
> Can I add a new sink into the execution graph at runtime, for example : a
>> new Kafka producer , without restarting the current application  or using
>> option1 ?
>>
>
> No, there is no way to add a sink without restart currently. Could you
> elaborate why a restart is not an option for you?
>
> You can use Option 2, which means that you implement 1 source and 1 sink
> which will dynamically read from or write to different topics possibly by
> wrapping the existing source and sink. This is a rather complex task that I
> would not recommend to a new Flink user.
>
> If you have a known set of possible sink topics, another option would be
> to add all sinks from the go and only route messages dynamically with
> side-outputs. However, I'm not aware that such a pattern exists for
> sources. Although with the new source interface, it should be possible to
> do that.
>
> On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping 
> wrote:
>
>> Hi Team,
>>
>> Can you provide your thoughts on this, it will be helpful ..
>>
>> Thanks
>> Jessy
>>
>> On Tue, 16 Mar 2021 at 21:29, Jessy Ping 
>> wrote:
>>
>>> Hi Timo/Team,
>>> Thanks for the reply.
>>>
>>> Just take the example from the following pseduo code,
>>> Suppose , this is the current application logic.
>>>
>>> firstInputStream = addSource(...)* //Kafka consumer C1*
>>> secondInputStream =  addSource(...) *//Kafka consumer C2*
>>>
>>> outputStream = firstInputStream,keyBy(a -> a.key)
>>> .connect(secondInputStream.keyBy(b->b.key))
>>> .coProcessFunction()
>>> * // logic determines : whether a new sink should be added to the
>>> application or not ?. If not: then the event will be produced to the
>>> existing sink(s). If a new sink is required: produce the events to the
>>> existing sinks + the new one*
>>> sink1 = addSink(outPutStream). //Kafka producer P1
>>> .
>>> .
>>> .
>>> sinkN =  addSink(outPutStream). //Kafka producer PN
>>>
>>> *Questions*
>>> --> Can I add a new sink into the execution graph at runtime, for
>>> example : a new Kafka producer , without restarting the current
>>> application  or using option1 ?
>>>
>>> -->  (Option 2 )What do you mean by adding a custom sink at
>>> coProcessFunction , how will it change the execution graph ?
>>>
>>> Thanks
>>> Jessy
>>>
>>>
>>>
>>> On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:
>>>
>>>> Hi Jessy,
>>>>
>>>> to be precise, the JobGraph is not used at runtime. It is translated
>>>> into an ExecutionGraph.
>>>>
>>>> But nevertheless such patterns are possible but require a bit of manual
>>>> implementation.
>>>>
>>>> Option 1) You stop the job with a savepoint and restart the application
>>>> with slightly different parameters. If the pipeline has not changed
>>>> much, the old state can be remapped to the slightly modified job graph.
>>>> This is the easiest solution but with the downside of maybe a couple of
>>>> seconds downtime.
>>>>
>>>> Option 2) You introduce a dedicated control stream (i.e. by using the
>>>> connect() DataStream API [1]). Either you implement a custom sink in
>>>> the
>>>> main stream of the CoProcessFunction. Or you enrich every record in the
>>>> main stream with sink parameters that are read by you custom sink
>>>> implementation.
>>>>
>>>> I hope this helps.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> [1]
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>>>>
>>>> On 16.03.21 12:37, Jessy Ping wrote:
>>>> > Hi Team,
>>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want
>>>> to
>>>> > add a new sink to the flink application at runtime that depends upon
>>>> > the  specific parameters in the incoming events.Can i edit the
>>>> jobgraph
>>>> > of a running flink application ?
>>>> >
>>>> > Thanks
>>>> > Jessy
>>>>
>>>>


Re: Editing job graph at runtime

2021-03-22 Thread Ejaskhan S
Thanks Arvid for the reply.

Can you please elaborate a little bit on option 2 , if possible ?

We are looking for a similar option. Currently we are proceeding with
option 1.

Thanks Jessy for the question

On Mon, Mar 22, 2021, 7:27 PM Arvid Heise  wrote:

> Hi Jessy,
>
> Can I add a new sink into the execution graph at runtime, for example : a
>> new Kafka producer , without restarting the current application  or using
>> option1 ?
>>
>
> No, there is no way to add a sink without restart currently. Could you
> elaborate why a restart is not an option for you?
>
> You can use Option 2, which means that you implement 1 source and 1 sink
> which will dynamically read from or write to different topics possibly by
> wrapping the existing source and sink. This is a rather complex task that I
> would not recommend to a new Flink user.
>
> If you have a known set of possible sink topics, another option would be
> to add all sinks from the go and only route messages dynamically with
> side-outputs. However, I'm not aware that such a pattern exists for
> sources. Although with the new source interface, it should be possible to
> do that.
>
> On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping 
> wrote:
>
>> Hi Team,
>>
>> Can you provide your thoughts on this, it will be helpful ..
>>
>> Thanks
>> Jessy
>>
>> On Tue, 16 Mar 2021 at 21:29, Jessy Ping 
>> wrote:
>>
>>> Hi Timo/Team,
>>> Thanks for the reply.
>>>
>>> Just take the example from the following pseduo code,
>>> Suppose , this is the current application logic.
>>>
>>> firstInputStream = addSource(...)* //Kafka consumer C1*
>>> secondInputStream =  addSource(...) *//Kafka consumer C2*
>>>
>>> outputStream = firstInputStream,keyBy(a -> a.key)
>>> .connect(secondInputStream.keyBy(b->b.key))
>>> .coProcessFunction()
>>> * // logic determines : whether a new sink should be added to the
>>> application or not ?. If not: then the event will be produced to the
>>> existing sink(s). If a new sink is required: produce the events to the
>>> existing sinks + the new one*
>>> sink1 = addSink(outPutStream). //Kafka producer P1
>>> .
>>> .
>>> .
>>> sinkN =  addSink(outPutStream). //Kafka producer PN
>>>
>>> *Questions*
>>> --> Can I add a new sink into the execution graph at runtime, for
>>> example : a new Kafka producer , without restarting the current
>>> application  or using option1 ?
>>>
>>> -->  (Option 2 )What do you mean by adding a custom sink at
>>> coProcessFunction , how will it change the execution graph ?
>>>
>>> Thanks
>>> Jessy
>>>
>>>
>>>
>>> On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:
>>>
>>>> Hi Jessy,
>>>>
>>>> to be precise, the JobGraph is not used at runtime. It is translated
>>>> into an ExecutionGraph.
>>>>
>>>> But nevertheless such patterns are possible but require a bit of manual
>>>> implementation.
>>>>
>>>> Option 1) You stop the job with a savepoint and restart the application
>>>> with slightly different parameters. If the pipeline has not changed
>>>> much, the old state can be remapped to the slightly modified job graph.
>>>> This is the easiest solution but with the downside of maybe a couple of
>>>> seconds downtime.
>>>>
>>>> Option 2) You introduce a dedicated control stream (i.e. by using the
>>>> connect() DataStream API [1]). Either you implement a custom sink in
>>>> the
>>>> main stream of the CoProcessFunction. Or you enrich every record in the
>>>> main stream with sink parameters that are read by you custom sink
>>>> implementation.
>>>>
>>>> I hope this helps.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> [1]
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>>>>
>>>> On 16.03.21 12:37, Jessy Ping wrote:
>>>> > Hi Team,
>>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want
>>>> to
>>>> > add a new sink to the flink application at runtime that depends upon
>>>> > the  specific parameters in the incoming events.Can i edit the
>>>> jobgraph
>>>> > of a running flink application ?
>>>> >
>>>> > Thanks
>>>> > Jessy
>>>>
>>>>


Re: Editing job graph at runtime

2021-03-23 Thread Jessy Ping
Hi Arvid,

Thanks for the reply.

I am currently exploring the flink features and we have certain use cases
where new producers will be added the system dynamically and we don't want
to restart the application frequently.

It will be helpful if you explain the option 2 in detail ?

Thanks & Regards
Jessy

On Mon, 22 Mar 2021 at 19:27, Arvid Heise  wrote:

> Hi Jessy,
>
> Can I add a new sink into the execution graph at runtime, for example : a
>> new Kafka producer , without restarting the current application  or using
>> option1 ?
>>
>
> No, there is no way to add a sink without restart currently. Could you
> elaborate why a restart is not an option for you?
>
> You can use Option 2, which means that you implement 1 source and 1 sink
> which will dynamically read from or write to different topics possibly by
> wrapping the existing source and sink. This is a rather complex task that I
> would not recommend to a new Flink user.
>
> If you have a known set of possible sink topics, another option would be
> to add all sinks from the go and only route messages dynamically with
> side-outputs. However, I'm not aware that such a pattern exists for
> sources. Although with the new source interface, it should be possible to
> do that.
>
> On Wed, Mar 17, 2021 at 7:12 AM Jessy Ping 
> wrote:
>
>> Hi Team,
>>
>> Can you provide your thoughts on this, it will be helpful ..
>>
>> Thanks
>> Jessy
>>
>> On Tue, 16 Mar 2021 at 21:29, Jessy Ping 
>> wrote:
>>
>>> Hi Timo/Team,
>>> Thanks for the reply.
>>>
>>> Just take the example from the following pseduo code,
>>> Suppose , this is the current application logic.
>>>
>>> firstInputStream = addSource(...)* //Kafka consumer C1*
>>> secondInputStream =  addSource(...) *//Kafka consumer C2*
>>>
>>> outputStream = firstInputStream,keyBy(a -> a.key)
>>> .connect(secondInputStream.keyBy(b->b.key))
>>> .coProcessFunction()
>>> * // logic determines : whether a new sink should be added to the
>>> application or not ?. If not: then the event will be produced to the
>>> existing sink(s). If a new sink is required: produce the events to the
>>> existing sinks + the new one*
>>> sink1 = addSink(outPutStream). //Kafka producer P1
>>> .
>>> .
>>> .
>>> sinkN =  addSink(outPutStream). //Kafka producer PN
>>>
>>> *Questions*
>>> --> Can I add a new sink into the execution graph at runtime, for
>>> example : a new Kafka producer , without restarting the current
>>> application  or using option1 ?
>>>
>>> -->  (Option 2 )What do you mean by adding a custom sink at
>>> coProcessFunction , how will it change the execution graph ?
>>>
>>> Thanks
>>> Jessy
>>>
>>>
>>>
>>> On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:
>>>
>>>> Hi Jessy,
>>>>
>>>> to be precise, the JobGraph is not used at runtime. It is translated
>>>> into an ExecutionGraph.
>>>>
>>>> But nevertheless such patterns are possible but require a bit of manual
>>>> implementation.
>>>>
>>>> Option 1) You stop the job with a savepoint and restart the application
>>>> with slightly different parameters. If the pipeline has not changed
>>>> much, the old state can be remapped to the slightly modified job graph.
>>>> This is the easiest solution but with the downside of maybe a couple of
>>>> seconds downtime.
>>>>
>>>> Option 2) You introduce a dedicated control stream (i.e. by using the
>>>> connect() DataStream API [1]). Either you implement a custom sink in
>>>> the
>>>> main stream of the CoProcessFunction. Or you enrich every record in the
>>>> main stream with sink parameters that are read by you custom sink
>>>> implementation.
>>>>
>>>> I hope this helps.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> [1]
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>>>>
>>>> On 16.03.21 12:37, Jessy Ping wrote:
>>>> > Hi Team,
>>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want
>>>> to
>>>> > add a new sink to the flink application at runtime that depends upon
>>>> > the  specific parameters in the incoming events.Can i edit the
>>>> jobgraph
>>>> > of a running flink application ?
>>>> >
>>>> > Thanks
>>>> > Jessy
>>>>
>>>>


Re: Editing job graph at runtime

2021-03-23 Thread Arvid Heise
existing sinks + the new one*
>>>> sink1 = addSink(outPutStream). //Kafka producer P1
>>>> .
>>>> .
>>>> .
>>>> sinkN =  addSink(outPutStream). //Kafka producer PN
>>>>
>>>> *Questions*
>>>> --> Can I add a new sink into the execution graph at runtime, for
>>>> example : a new Kafka producer , without restarting the current
>>>> application  or using option1 ?
>>>>
>>>> -->  (Option 2 )What do you mean by adding a custom sink at
>>>> coProcessFunction , how will it change the execution graph ?
>>>>
>>>> Thanks
>>>> Jessy
>>>>
>>>>
>>>>
>>>> On Tue, 16 Mar 2021 at 17:45, Timo Walther  wrote:
>>>>
>>>>> Hi Jessy,
>>>>>
>>>>> to be precise, the JobGraph is not used at runtime. It is translated
>>>>> into an ExecutionGraph.
>>>>>
>>>>> But nevertheless such patterns are possible but require a bit of
>>>>> manual
>>>>> implementation.
>>>>>
>>>>> Option 1) You stop the job with a savepoint and restart the
>>>>> application
>>>>> with slightly different parameters. If the pipeline has not changed
>>>>> much, the old state can be remapped to the slightly modified job
>>>>> graph.
>>>>> This is the easiest solution but with the downside of maybe a couple
>>>>> of
>>>>> seconds downtime.
>>>>>
>>>>> Option 2) You introduce a dedicated control stream (i.e. by using the
>>>>> connect() DataStream API [1]). Either you implement a custom sink in
>>>>> the
>>>>> main stream of the CoProcessFunction. Or you enrich every record in
>>>>> the
>>>>> main stream with sink parameters that are read by you custom sink
>>>>> implementation.
>>>>>
>>>>> I hope this helps.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>>
>>>>> [1]
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/overview/#connect
>>>>>
>>>>> On 16.03.21 12:37, Jessy Ping wrote:
>>>>> > Hi Team,
>>>>> > Is it possible to edit the job graph at runtime ? . Suppose, I want
>>>>> to
>>>>> > add a new sink to the flink application at runtime that depends upon
>>>>> > the  specific parameters in the incoming events.Can i edit the
>>>>> jobgraph
>>>>> > of a running flink application ?
>>>>> >
>>>>> > Thanks
>>>>> > Jessy
>>>>>
>>>>>


Disable job graph in web UI

2017-09-07 Thread Joshua Griffith
Hello, I have an auto-generated job that creates too many tasks for web UI’s 
job graph to handle. The browser pinwheels while the page attempts to load. Is 
it possible to disable the job graph component in the web UI? For slightly 
smaller jobs, once the graph loads the rest of the UI is usable.

Thanks,

Joshua

Re: failed when job graph change

2023-12-03 Thread nick toker
Hi

restart the job it's ok and i do that , but i must cancel the job and
submit a new one and i dont want the data from the state
forget to  mention that i use the parameter "-allowNonRestoredState"


my steps:
1. stop the job with savepoint
2. run the updated job ( update job graph) from savepoint

expect it to run

currently the result is the the job fail

nick



‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com‬‏>:‬

> Hi, nick.
>
> > using savepoint i must cancel the job to be able run the new graph
>
> Do you mean that you need cancel and start the job using the new flink job
> graph in 1.17.1,
> and in the past, it was able to make the changes to the new operator
> effective without restarting the job?
>
> I think in order for the new job graph to take effect, it is necessary to
> restart the job.
>
> --
> Best!
> Xuyang
>
>
> At 2023-12-03 21:49:23, "nick toker"  wrote:
>
> Hi
>
> when i add or remove an operator in the job graph , using savepoint i must
> cancel the job to be able run the new graph
>
> e.g. by adding or removing operator (like new sink target)
> it was working in the past
> i using flink 1.17.1
>
> 1. is it a known bug? if so when planned to be fix
>
> 2. do i need to do something to make it work?
>
>
> nick
>
>


Re:Re: failed when job graph change

2023-12-04 Thread Xuyang
Hi,
Can you attach the log about the exception when job failed?



--

Best!
Xuyang




在 2023-12-04 15:56:04,"nick toker"  写道:

Hi


restart the job it's ok and i do that , but i must cancel the job and submit a 
new one and i dont want the data from the state
forget to  mention that i use the parameter "-allowNonRestoredState" 




my steps:

1. stop the job with savepoint

2. run the updated job ( update job graph) from savepoint



expect it to run


currently the result is the the job fail


nick







‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com‬‏>:‬

Hi, nick.


> using savepoint i must cancel the job to be able run the new graph


Do you mean that you need cancel and start the job using the new flink job 
graph in 1.17.1, 
and in the past, it was able to make the changes to the new operator effective 
without restarting the job?


I think in order for the new job graph to take effect, it is necessary to 
restart the job.



--

Best!
Xuyang




At 2023-12-03 21:49:23, "nick toker"  wrote:

Hi


when i add or remove an operator in the job graph , using savepoint i must 
cancel the job to be able run the new graph


e.g. by adding or removing operator (like new sink target)

it was working in the past

i using flink 1.17.1


1. is it a known bug? if so when planned to be fix


2. do i need to do something to make it work?





nick




Re: Disable job graph in web UI

2017-09-08 Thread Joshua Griffith
Upon further inspection, it appears that the web UI redraws each DOM element 
with every update. So I think removing the graph won’t fix the page performance 
issue because each task list item is being redrawn on every refresh.

> On Sep 7, 2017, at 2:22 PM, Joshua Griffith  wrote:
> 
> Hello, I have an auto-generated job that creates too many tasks for web UI’s 
> job graph to handle. The browser pinwheels while the page attempts to load. 
> Is it possible to disable the job graph component in the web UI? For slightly 
> smaller jobs, once the graph loads the rest of the UI is usable.
> 
> Thanks,
> 
> Joshua



Re: Re: failed when job graph change

2024-01-24 Thread nick toker
hi

i didn't found anything in the log
but i found that it happened when i add a new sink operator
and because i work with checkpoints  the flink can't finish the transaction
( the new topic in kafka not part of the transaction before i added the new
sink operator)

so i must cancel the job to make it work

How can I solve this issue?


nick

‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-10:27 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com‬‏>:‬

> Hi,
> Can you attach the log about the exception when job failed?
>
> --
> Best!
> Xuyang
>
>
> 在 2023-12-04 15:56:04,"nick toker"  写道:
>
> Hi
>
> restart the job it's ok and i do that , but i must cancel the job and
> submit a new one and i dont want the data from the state
> forget to  mention that i use the parameter "-allowNonRestoredState"
>
>
> my steps:
> 1. stop the job with savepoint
> 2. run the updated job ( update job graph) from savepoint
>
> expect it to run
>
> currently the result is the the job fail
>
> nick
>
>
>
> ‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com
> ‬‏>:‬
>
>> Hi, nick.
>>
>> > using savepoint i must cancel the job to be able run the new graph
>>
>> Do you mean that you need cancel and start the job using the new flink
>> job graph in 1.17.1,
>> and in the past, it was able to make the changes to the new operator
>> effective without restarting the job?
>>
>> I think in order for the new job graph to take effect, it is necessary to
>> restart the job.
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2023-12-03 21:49:23, "nick toker"  wrote:
>>
>> Hi
>>
>> when i add or remove an operator in the job graph , using savepoint i
>> must cancel the job to be able run the new graph
>>
>> e.g. by adding or removing operator (like new sink target)
>> it was working in the past
>> i using flink 1.17.1
>>
>> 1. is it a known bug? if so when planned to be fix
>>
>> 2. do i need to do something to make it work?
>>
>>
>> nick
>>
>>


Re: Re: failed when job graph change

2024-01-24 Thread Feng Jin
Hi nick

If you want to modify the sink operator , I think you can modify the uid of
the operator to avoid restoring the state that does not belong to it.


Best,
Feng


On Thu, Jan 25, 2024 at 1:19 AM nick toker  wrote:

> hi
>
> i didn't found anything in the log
> but i found that it happened when i add a new sink operator
> and because i work with checkpoints  the flink can't finish the
> transaction ( the new topic in kafka not part of the transaction before i
> added the new sink operator)
>
> so i must cancel the job to make it work
>
> How can I solve this issue?
>
>
> nick
>
> ‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-10:27 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com
> ‬‏>:‬
>
>> Hi,
>> Can you attach the log about the exception when job failed?
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> 在 2023-12-04 15:56:04,"nick toker"  写道:
>>
>> Hi
>>
>> restart the job it's ok and i do that , but i must cancel the job and
>> submit a new one and i dont want the data from the state
>> forget to  mention that i use the parameter "-allowNonRestoredState"
>>
>>
>> my steps:
>> 1. stop the job with savepoint
>> 2. run the updated job ( update job graph) from savepoint
>>
>> expect it to run
>>
>> currently the result is the the job fail
>>
>> nick
>>
>>
>>
>> ‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com
>> ‬‏>:‬
>>
>>> Hi, nick.
>>>
>>> > using savepoint i must cancel the job to be able run the new graph
>>>
>>> Do you mean that you need cancel and start the job using the new flink
>>> job graph in 1.17.1,
>>> and in the past, it was able to make the changes to the new operator
>>> effective without restarting the job?
>>>
>>> I think in order for the new job graph to take effect, it is necessary
>>> to restart the job.
>>>
>>> --
>>> Best!
>>> Xuyang
>>>
>>>
>>> At 2023-12-03 21:49:23, "nick toker"  wrote:
>>>
>>> Hi
>>>
>>> when i add or remove an operator in the job graph , using savepoint i
>>> must cancel the job to be able run the new graph
>>>
>>> e.g. by adding or removing operator (like new sink target)
>>> it was working in the past
>>> i using flink 1.17.1
>>>
>>> 1. is it a known bug? if so when planned to be fix
>>>
>>> 2. do i need to do something to make it work?
>>>
>>>
>>> nick
>>>
>>>


Re: Re: failed when job graph change

2024-01-24 Thread nick toker
Hi
i adding a new sink that was not exists in the graph
1. stop with save point
2. run the the new graph with the new sink operator ( from save point)

in this case the job stuck in initializing forever because cant complete
transaction ( on the new operator , kafka topic)

i dont understand how change the uid will help its a new operator with new
uid

nick

‫בתאריך יום ד׳, 24 בינו׳ 2024 ב-19:30 מאת ‪Feng Jin‬‏ <‪
jinfeng1...@gmail.com‬‏>:‬

> Hi nick
>
> If you want to modify the sink operator , I think you can modify the uid
> of the operator to avoid restoring the state that does not belong to it.
>
>
> Best,
> Feng
>
>
> On Thu, Jan 25, 2024 at 1:19 AM nick toker 
> wrote:
>
>> hi
>>
>> i didn't found anything in the log
>> but i found that it happened when i add a new sink operator
>> and because i work with checkpoints  the flink can't finish the
>> transaction ( the new topic in kafka not part of the transaction before i
>> added the new sink operator)
>>
>> so i must cancel the job to make it work
>>
>> How can I solve this issue?
>>
>>
>> nick
>>
>> ‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-10:27 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com
>> ‬‏>:‬
>>
>>> Hi,
>>> Can you attach the log about the exception when job failed?
>>>
>>> --
>>> Best!
>>> Xuyang
>>>
>>>
>>> 在 2023-12-04 15:56:04,"nick toker"  写道:
>>>
>>> Hi
>>>
>>> restart the job it's ok and i do that , but i must cancel the job and
>>> submit a new one and i dont want the data from the state
>>> forget to  mention that i use the parameter "-allowNonRestoredState"
>>>
>>>
>>> my steps:
>>> 1. stop the job with savepoint
>>> 2. run the updated job ( update job graph) from savepoint
>>>
>>> expect it to run
>>>
>>> currently the result is the the job fail
>>>
>>> nick
>>>
>>>
>>>
>>> ‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com
>>> ‬‏>:‬
>>>
>>>> Hi, nick.
>>>>
>>>> > using savepoint i must cancel the job to be able run the new graph
>>>>
>>>> Do you mean that you need cancel and start the job using the new flink
>>>> job graph in 1.17.1,
>>>> and in the past, it was able to make the changes to the new operator
>>>> effective without restarting the job?
>>>>
>>>> I think in order for the new job graph to take effect, it is necessary
>>>> to restart the job.
>>>>
>>>> --
>>>> Best!
>>>> Xuyang
>>>>
>>>>
>>>> At 2023-12-03 21:49:23, "nick toker"  wrote:
>>>>
>>>> Hi
>>>>
>>>> when i add or remove an operator in the job graph , using savepoint i
>>>> must cancel the job to be able run the new graph
>>>>
>>>> e.g. by adding or removing operator (like new sink target)
>>>> it was working in the past
>>>> i using flink 1.17.1
>>>>
>>>> 1. is it a known bug? if so when planned to be fix
>>>>
>>>> 2. do i need to do something to make it work?
>>>>
>>>>
>>>> nick
>>>>
>>>>


Documentation for translation of Job graph to Execution graph

2016-06-16 Thread Bajaj, Abhinav
Hi,

When troubleshooting a flink job, it is tricky to map the Job graph 
(application code) to the logs & monitoring REST APIs.

So, I am trying to find documentation on how a Job graph is translated to 
Execution graph.
I found this - 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html

Any detailed documentation on the design and code components will be helpful.

Thanks,
Abhi


Re: Documentation for translation of Job graph to Execution graph

2016-06-17 Thread Aljoscha Krettek
Hi,
I'm afraid there is no documentation besides the link that you posted and
this one:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html
.

With what parts are you having trouble? Maybe I can help.

Cheers,
Aljoscha

On Thu, 16 Jun 2016 at 19:31 Bajaj, Abhinav  wrote:

> Hi,
>
> When troubleshooting a flink job, it is tricky to map the Job graph
> (application code) to the logs & monitoring REST APIs.
>
> So, I am trying to find documentation on how a Job graph is translated to
> Execution graph.
> I found this -
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html
>
> Any detailed documentation on the design and code components will be
> helpful.
>
> Thanks,
> Abhi
>


Re: Documentation for translation of Job graph to Execution graph

2016-06-17 Thread Bajaj, Abhinav
Hi,

Thanks for sharing this link. I have not see it before. May be this is newly 
added in 1.0 docs. I will go through it.

In general, there are two things I am trying to understand and get comfortable 
with -

  1.  How a Job graph is translated to Execution graph. The logs and monitoring 
APIs are for the Execution graph. So, I need to map them to the Job graph. I am 
trying to bridge this gap.
  2.  The job manager & task manager logs are tricky to decipher. Especially 
when there are multiple jobs running. Is there a way to filter the logs for a 
single job ?

~ Abhi


From: Aljoscha Krettek mailto:aljos...@apache.org>>
Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Date: Friday, June 17, 2016 at 2:31 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Documentation for translation of Job graph to Execution graph

Hi,
I'm afraid there is no documentation besides the link that you posted and this 
one: 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html.

With what parts are you having trouble? Maybe I can help.

Cheers,
Aljoscha

On Thu, 16 Jun 2016 at 19:31 Bajaj, Abhinav 
mailto:abhinav.ba...@here.com>> wrote:
Hi,

When troubleshooting a flink job, it is tricky to map the Job graph 
(application code) to the logs & monitoring REST APIs.

So, I am trying to find documentation on how a Job graph is translated to 
Execution graph.
I found this - 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html

Any detailed documentation on the design and code components will be helpful.

Thanks,
Abhi


Re: Documentation for translation of Job graph to Execution graph

2016-06-21 Thread Robert Metzger
Hi,
the link has been added newly, yes.

Regarding Q1, since there is no documentation right now, I have to refer
you to our code. In the JobManager.scala class, there is a method "private
def submitJob(jobGraph, ...") where the ExecutionGraph is created. I think
that's a good starting point for looking through the code. (I also added
Till to the message if he wants to chime in)

Q2: Currently, Flink doesn't add the job name to the logs, so its indeed
not very easy to separate the log entries generated by different jobs. In
general, we recommend running one JobManager per job (multiple jobs is of
course also supported).




On Sat, Jun 18, 2016 at 1:41 AM, Bajaj, Abhinav 
wrote:

> Hi,
>
> Thanks for sharing this link. I have not see it before. May be this is
> newly added in 1.0 docs. I will go through it.
>
> In general, there are two things I am trying to understand and get
> comfortable with -
>
>1. How a Job graph is translated to Execution graph. The logs and
>monitoring APIs are for the Execution graph. So, I need to map them to the
>Job graph. I am trying to bridge this gap.
>2. The job manager & task manager logs are tricky to decipher.
>Especially when there are multiple jobs running. Is there a way to filter
>the logs for a single job ?
>
>
> ~ Abhi
>
>
> From: Aljoscha Krettek 
> Reply-To: "user@flink.apache.org" 
> Date: Friday, June 17, 2016 at 2:31 AM
> To: "user@flink.apache.org" 
> Subject: Re: Documentation for translation of Job graph to Execution graph
>
> Hi,
> I'm afraid there is no documentation besides the link that you posted and
> this one:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html
> .
>
> With what parts are you having trouble? Maybe I can help.
>
> Cheers,
> Aljoscha
>
> On Thu, 16 Jun 2016 at 19:31 Bajaj, Abhinav 
> wrote:
>
>> Hi,
>>
>> When troubleshooting a flink job, it is tricky to map the Job graph
>> (application code) to the logs & monitoring REST APIs.
>>
>> So, I am trying to find documentation on how a Job graph is translated to
>> Execution graph.
>> I found this -
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html
>>
>> Any detailed documentation on the design and code components will be
>> helpful.
>>
>> Thanks,
>> Abhi
>>
>


Re: Documentation for translation of Job graph to Execution graph

2016-06-21 Thread Bajaj, Abhinav
Thanks Robert for helpful reply.

I have follow up on the Q2 - "In general, we recommend running one JobManager 
per job”
I understand how this can be achieved while running in Yarn, I.e. by submitting 
single Flink Jobs.

Is their some other way of setting Flink to configure single Jobmanager per job 
?



From: Robert Metzger mailto:rmetz...@apache.org>>
Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Date: Tuesday, June 21, 2016 at 8:23 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>, Till Rohrmann 
mailto:trohrm...@apache.org>>
Cc: Aljoscha Krettek mailto:aljos...@apache.org>>
Subject: Re: Documentation for translation of Job graph to Execution graph

Hi,
the link has been added newly, yes.

Regarding Q1, since there is no documentation right now, I have to refer you to 
our code. In the JobManager.scala class, there is a method "private def 
submitJob(jobGraph, ...") where the ExecutionGraph is created. I think that's a 
good starting point for looking through the code. (I also added Till to the 
message if he wants to chime in)

Q2: Currently, Flink doesn't add the job name to the logs, so its indeed not 
very easy to separate the log entries generated by different jobs. In general, 
we recommend running one JobManager per job (multiple jobs is of course also 
supported).




On Sat, Jun 18, 2016 at 1:41 AM, Bajaj, Abhinav 
mailto:abhinav.ba...@here.com>> wrote:
Hi,

Thanks for sharing this link. I have not see it before. May be this is newly 
added in 1.0 docs. I will go through it.

In general, there are two things I am trying to understand and get comfortable 
with -

  1.  How a Job graph is translated to Execution graph. The logs and monitoring 
APIs are for the Execution graph. So, I need to map them to the Job graph. I am 
trying to bridge this gap.
  2.  The job manager & task manager logs are tricky to decipher. Especially 
when there are multiple jobs running. Is there a way to filter the logs for a 
single job ?

~ Abhi


From: Aljoscha Krettek mailto:aljos...@apache.org>>
Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Date: Friday, June 17, 2016 at 2:31 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Documentation for translation of Job graph to Execution graph

Hi,
I'm afraid there is no documentation besides the link that you posted and this 
one: 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html.

With what parts are you having trouble? Maybe I can help.

Cheers,
Aljoscha

On Thu, 16 Jun 2016 at 19:31 Bajaj, Abhinav 
mailto:abhinav.ba...@here.com>> wrote:
Hi,

When troubleshooting a flink job, it is tricky to map the Job graph 
(application code) to the logs & monitoring REST APIs.

So, I am trying to find documentation on how a Job graph is translated to 
Execution graph.
I found this - 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html

Any detailed documentation on the design and code components will be helpful.

Thanks,
Abhi



Re: Documentation for translation of Job graph to Execution graph

2016-06-29 Thread Bajaj, Abhinav
Hi Robert,

Thanks for helpful reply.
I have couple of follow up questions on your reply - "In general, we recommend 
running one JobManager per job”
I understand how this can be achieved while running in Yarn, I.e. by submitting 
single Flink Jobs.

Is their some other way of setting Flink to configure single Jobmanager per job 
?

Is their a plan to add the Job id or name to the logs ?

Thanks,
Abhi

From: Robert Metzger mailto:rmetz...@apache.org>>
Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Date: Tuesday, June 21, 2016 at 8:23 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>, Till Rohrmann 
mailto:trohrm...@apache.org>>
Cc: Aljoscha Krettek mailto:aljos...@apache.org>>
Subject: Re: Documentation for translation of Job graph to Execution graph

Hi,
the link has been added newly, yes.

Regarding Q1, since there is no documentation right now, I have to refer you to 
our code. In the JobManager.scala class, there is a method "private def 
submitJob(jobGraph, ...") where the ExecutionGraph is created. I think that's a 
good starting point for looking through the code. (I also added Till to the 
message if he wants to chime in)

Q2: Currently, Flink doesn't add the job name to the logs, so its indeed not 
very easy to separate the log entries generated by different jobs. In general, 
we recommend running one JobManager per job (multiple jobs is of course also 
supported).




On Sat, Jun 18, 2016 at 1:41 AM, Bajaj, Abhinav 
mailto:abhinav.ba...@here.com>> wrote:
Hi,

Thanks for sharing this link. I have not see it before. May be this is newly 
added in 1.0 docs. I will go through it.

In general, there are two things I am trying to understand and get comfortable 
with -

  1.  How a Job graph is translated to Execution graph. The logs and monitoring 
APIs are for the Execution graph. So, I need to map them to the Job graph. I am 
trying to bridge this gap.
  2.  The job manager & task manager logs are tricky to decipher. Especially 
when there are multiple jobs running. Is there a way to filter the logs for a 
single job ?

~ Abhi


From: Aljoscha Krettek mailto:aljos...@apache.org>>
Reply-To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Date: Friday, June 17, 2016 at 2:31 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Documentation for translation of Job graph to Execution graph

Hi,
I'm afraid there is no documentation besides the link that you posted and this 
one: 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html.

With what parts are you having trouble? Maybe I can help.

Cheers,
Aljoscha

On Thu, 16 Jun 2016 at 19:31 Bajaj, Abhinav 
mailto:abhinav.ba...@here.com>> wrote:
Hi,

When troubleshooting a flink job, it is tricky to map the Job graph 
(application code) to the logs & monitoring REST APIs.

So, I am trying to find documentation on how a Job graph is translated to 
Execution graph.
I found this - 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheduling.html

Any detailed documentation on the design and code components will be helpful.

Thanks,
Abhi



Re: Documentation for translation of Job graph to Execution graph

2016-07-04 Thread Ufuk Celebi
On Wed, Jun 29, 2016 at 9:19 PM, Bajaj, Abhinav  wrote:
> Is their a plan to add the Job id or name to the logs ?

This is now part of the YARN client output and should be part of the
1.1 release.


Regarding your other question: in standalone mode, you have to
manually make sure to not submit multiple jobs to a single cluster.


Recommendation for dealing with Job Graph incompatibility across varying Flink versions

2021-06-17 Thread Sonam Mandal
Hello,

We are exploring running multiple Flink clusters within a Kubernetes cluster 
such that each Flink cluster can run with a specified Flink image version. 
Since the Flink Job Graph needs to be compatible with the Flink version running 
in the Flink cluster, this brings a challenge in how we ensure that the SQL job 
graph or Flink job jars are compatible with the Flink cluster users want to run 
them on.

E.g. if the Flink cluster is running version 1.12.1, the job graph generated 
from the SQL must be created using compatible 1.12.1 Flink libraries. 
Otherwise, we see issues with deserialization etc.

Is there a recommended way to handle this scenario today?

Thanks,
Sonam


Job Manager taking long time to upload job graph on remote storage

2020-09-02 Thread Prakhar Mathur
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds
in order to start a job on a session Flink cluster. We start the job using
Flink's monitoring REST API where our jar is already uploaded on Job
Manager. Our jar file size is around 200 MB. We are using memory state
backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan
itself takes around 6 seconds and copying job graph from local to the
remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced
via tweaking any configuration?

Thank you. Regards
Prakhar Mathur


Re: Recommendation for dealing with Job Graph incompatibility across varying Flink versions

2021-06-18 Thread Paul K Moore
Hi Sonam,

I am not a long-standing Flink user (3 months only) so perhaps others will have 
a more authoritative view.

I would say that I am using Flink in k8s, and have had some good success with 
the Google Flink operator 
(https://github.com/GoogleCloudPlatform/flink-on-k8s-operator).  This includes 
Custom Resource Definitions (CRDs) so that you can define your Flink clusters 
in YAML, and deploy using kustomize. 

The result is:

A Flink cluster of a job-manager and one-or-more task-managers.
A Kubernetes job which acts as the link “client” to submit the job to the 
job-manager, the job-submitter

e.g.

flink-example-job-submitter-g4s6g   0/1 Completed   0  6d15h
flink-example-jobmanager-0  1/1 Running 3  6d15h
flink-example-taskmanager-0 1/1 Running 3  6d15h

This all seems in keeping with Flink’s “Per Job-Mode” deployment option 
(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/overview/#per-job-mode)

Note: i’m only just getting into to state persistence and recovery, so still 
some work to do, but I think this is largely understanding and configuration.

Hope that helps

Paul

> On 17 Jun 2021, at 23:55, Sonam Mandal  wrote:
> 
> Hello,
> 
> We are exploring running multiple Flink clusters within a Kubernetes cluster 
> such that each Flink cluster can run with a specified Flink image version. 
> Since the Flink Job Graph needs to be compatible with the Flink version 
> running in the Flink cluster, this brings a challenge in how we ensure that 
> the SQL job graph or Flink job jars are compatible with the Flink cluster 
> users want to run them on.
> 
> E.g. if the Flink cluster is running version 1.12.1, the job graph generated 
> from the SQL must be created using compatible 1.12.1 Flink libraries. 
> Otherwise, we see issues with deserialization etc.
> 
> Is there a recommended way to handle this scenario today? 
> 
> Thanks,
> Sonam



Re: Recommendation for dealing with Job Graph incompatibility across varying Flink versions

2021-06-18 Thread Sonam Mandal
Hi Paul,

Thanks for getting back to me. I did take a look at the Google GO operator, and 
they use the /bin/flink client for job submission. My understanding is that in 
this scenario users must ensure that their job jar is compatible with the Flink 
version, and the client will just take care of the submission. Do let me know 
if I understood this correctly or not.

Perhaps some context on what we are doing will help. We are writing a small 
client which takes SQL statements and converts it to the Table environment and 
submits the job. The /bin/flink client does not directly take SQL out of the 
box and we cannot expect users to run a SQL shell to run their production SQL 
streaming services. Since we are dealing with the job graph generation 
ourselves, we have run into the issue where our client needs to be compiled 
with the same version of Flink that we are running, otherwise we run into job 
graph compatibility issues. So I wanted to understand if there is a 
recommendation on how to deal with this conversion in a scenario where 
different users may run different Flink versions in a given kubernetes cluster.

Any thoughts?

Thanks,
Sonam

From: Paul K Moore 
Sent: Friday, June 18, 2021 2:25:52 AM
To: Sonam Mandal 
Cc: user@flink.apache.org ; Srinivasulu Punuru 

Subject: Re: Recommendation for dealing with Job Graph incompatibility across 
varying Flink versions

Hi Sonam,

I am not a long-standing Flink user (3 months only) so perhaps others will have 
a more authoritative view.

I would say that I am using Flink in k8s, and have had some good success with 
the Google Flink operator 
(https://github.com/GoogleCloudPlatform/flink-on-k8s-operator<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FGoogleCloudPlatform%2Fflink-on-k8s-operator&data=04%7C01%7Csomandal%40linkedin.com%7C0a15c995b4d04585169908d9323b19fc%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637596051695453848%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=geelaXQ9JiFnVRZUK%2BsMdqU44i8yleWVZpE4q6JxYRE%3D&reserved=0>).
  This includes Custom Resource Definitions (CRDs) so that you can define your 
Flink clusters in YAML, and deploy using kustomize.

The result is:

A Flink cluster of a job-manager and one-or-more task-managers.
A Kubernetes job which acts as the link “client” to submit the job to the 
job-manager, the job-submitter

e.g.

flink-example-job-submitter-g4s6g   0/1 Completed   0  6d15h
flink-example-jobmanager-0  1/1 Running 3  6d15h
flink-example-taskmanager-0 1/1 Running 3  6d15h

This all seems in keeping with Flink’s “Per Job-Mode” deployment option 
(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/overview/#per-job-mode<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.13%2Fdocs%2Fdeployment%2Foverview%2F%23per-job-mode&data=04%7C01%7Csomandal%40linkedin.com%7C0a15c995b4d04585169908d9323b19fc%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637596051695463844%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=8uqX5SHX%2FIneUXokKFuNQ8UE%2B0wMjzwkV%2FO8uB6OKVQ%3D&reserved=0>)

Note: i’m only just getting into to state persistence and recovery, so still 
some work to do, but I think this is largely understanding and configuration.

Hope that helps

Paul

On 17 Jun 2021, at 23:55, Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:

Hello,

We are exploring running multiple Flink clusters within a Kubernetes cluster 
such that each Flink cluster can run with a specified Flink image version. 
Since the Flink Job Graph needs to be compatible with the Flink version running 
in the Flink cluster, this brings a challenge in how we ensure that the SQL job 
graph or Flink job jars are compatible with the Flink cluster users want to run 
them on.

E.g. if the Flink cluster is running version 1.12.1, the job graph generated 
from the SQL must be created using compatible 1.12.1 Flink libraries. 
Otherwise, we see issues with deserialization etc.

Is there a recommended way to handle this scenario today?

Thanks,
Sonam



Bugs in Streaming job stopping? Weird graceful stop/restart for disjoint job graph

2021-01-13 Thread Theo Diefenthal
Hi there, 

I'm currently analyzing a weird behavior of one of our jobs running on YARN 
with Flink 1.11.2. I have a kind of special situation here in that regard that 
I submit a single streaming job with a disjoint job graph, i.e. that job 
contains two graphs of the same kind but totally independent of each other (one 
having an ETL pipeline for source1, another for source2). It's just for 
historical reasons that makes deployment a bit easier. 

I had the job running nicely until I wanted to stop it with a savepoint as 
usual like so: 
flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS 
--yarnapplicationId=${FLINK_YARN_APPID} ${ID} 

After a minute, I receiveid a timeout exception [1]. 

Now the interesting part/possible bugs are the following 3: 
1. The stop was triggered at 21/01/12 06:15:10. I see that almost all tasks of 
the entire job switched from RUNNING to FINISHED within seconds, but two tasks 
had something that looks like a racecondition on shutdown. They threw an 
IllegalStateException in BufferBuilder.append where an assertion makes sure 
that the buffer is not yet finished. [2] 
2. That failure lead to RESTARTING the tasks of that job. So the failure 
occured 5 seconds after I triggered to stop the job. And 2 seconds later, I see 
that the pipeline switched it's state to RUNNING again. No wonder that the 
"stop" eventually stopped with a Timeout as the Job didn't think about shutting 
down anymore. 
3. BUT the major issue for me here is: The entire pipeline of source1 was 
restarted, but the pipeline of source2 was still FINISHED. As Fink did quite 
some stuff with Batch/Streaming unification and region failover/restart in the 
last versions, my guess is that as I am in the special case of a disjoint graph 
here, only the tasks in the connected graph where the error occured restarted 
properly and the other graph was left in FINISHED state, even though I am 
dealing with a streaming job here. 

The problem is that the job was left in kind of a broken state: From just 
watching at YARN / Flink UI it seemed to be still running and the stop had no 
effect, but in reality, it shut down a huge part of the job. My workaround of 
course is as following: 
1. If a "graceful stop" won't succeed, in future I will trigger a hard kill 
"yarn application -kill" afterwards because I can't be certain in what state 
the job is after a failed attempt to stop. 
2. I will enforce stronger isolation in my jobs so that I always have connected 
graphs as jobs. In my case: I will deploy two independent jobs for the two ETL 
pipelines and hope that this problem won't arise again (At least, have the 
entire job either FINISHED or RUNNING). 

But I'm curious what you think: Are those 3 bugs or (some of it) kind of 
expected behaviour? Should I open bug ticket(s) for those? 

Best regards 
Theo 



[1] Timeout from flink stop: 
org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
"f23290bf5fb0ecd49a4455e4a65f2eb6". 
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495) 
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864) 
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487) 
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931) 
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:422) 
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
 
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) 
Caused by: java.util.concurrent.TimeoutException 
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) 
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) 
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493) 
... 9 more 
[2] Exception in graceful shutdown: 
2021-01-12T06:15:15.827877+01:00 WARN org.apache.flink.runtime.taskmanager.Task 
Source: rawdata_source1 -> validation_source1 -> enrich_source1 -> 
map_json_source1 -> Sink: write_to_kafka_source1) (3/18) 
(bc68320cf69dd877782417a3298499d6) switched from RUNNING to FAILED. 
java.util.concurrent.ExecutionException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator 
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) 
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
 
at 
org.apache.fl

Re: Job Manager taking long time to upload job graph on remote storage

2020-09-02 Thread Till Rohrmann
Hi Prakhar,

have you enabled HA for your cluster? If yes, then Flink will try to store
the job graph to the configured high-availability.storageDir in order to be
able to recover it. If this operation takes long, then it is either the
filesystem which is slow or storing the pointer in ZooKeeper. If it is the
filesystem, then I would suggest to check whether you have some read/write
quotas which might slow the operation down.

If you haven't enabled HA or persisting the jobGraph is not what takes
long, then the next most likely candidate is the recovery from a previous
checkpoint. Here again, Flink needs to read from the remote storage (in
your case GCS). Depending on the size of the checkpoint and the read
bandwidth, this can be faster or slower. The best way to figure out what
takes long is to share the logs with us so that we can confirm what takes
long.

To sum it up, the job submission is most likely slow because of the
interplay of Flink with the external system (most likely your configured
filesystem). If the filesystem is somewhat throttled, then Flink cannot do
much about it.

What you could try to do is to check whether your jar contains dependencies
which are not needed (e.g. Flink dependencies which are usually provided by
the system). That way you could decrease the size of the jar a bit.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur  wrote:

> Hi,
>
> We are currently running Flink 1.9.0. We see a delay of around 20 seconds
> in order to start a job on a session Flink cluster. We start the job using
> Flink's monitoring REST API where our jar is already uploaded on Job
> Manager. Our jar file size is around 200 MB. We are using memory state
> backend having GCS as remote storage.
>
> On running the cluster in debug mode, we observed that generating the plan
> itself takes around 6 seconds and copying job graph from local to the
> remote folder takes around 10 seconds.
>
> We were wondering whether this delay is expected or if it can be reduced
> via tweaking any configuration?
>
> Thank you. Regards
> Prakhar Mathur
>


Re: Job Manager taking long time to upload job graph on remote storage

2020-09-02 Thread Prakhar Mathur
Hi,

Thanks for the response. Yes, we are running Flink in HA mode. We checked
there are no such quota limits for GCS for us. Please find the logs below,
here you can see the copying of blob started at 11:50:39,455 and it
got JobGraph submission at 11:50:46,400.

2020-09-01 11:50:37,061 DEBUG
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
the idle timeout.
2020-09-01 11:50:37,061 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
2020-09-01 11:50:37,062 DEBUG
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
the idle timeout.
2020-09-01 11:50:37,062 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
2020-09-01 11:50:37,305 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:37,305 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:37,354 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:37,354 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:39,455 DEBUG
org.apache.flink.runtime.blob.FileSystemBlobStore - Copying
from
/tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
to
gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
2020-09-01 11:50:43,904 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x30be3d929102460 after 2ms
2020-09-01 11:50:46,400 INFO
 org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,403 INFO
 org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting
job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,405 DEBUG
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to
flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
2020-09-01 11:50:47,325 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:47,330 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:47,331 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:52,880 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
notification sessionid:0x30be3d929102460
2020-09-01 11:50:52,880 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
WatchedEvent state:SyncConnected type:NodeChildrenChanged
path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
2020-09-01 11:50:52,882 INFO
 org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.

Thank You.

On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann  wrote:

> Hi Prakhar,
>
> have you enabled HA for your cluster? If yes, then Flink will try to store
> the job graph to the configured high-availability.storageDir in order to be
> able to recover it. If this operation takes long, then it is either the
> filesystem which is slow or storing the pointer in ZooKeeper. If it is the
> filesystem, then I would suggest to check whether you have some read/write
> quotas which might slow the operation down.
>
> If you haven't enabled HA or persisting the jobGraph is not what takes
> long, then the next most likely candidate is the recovery from a previous
> checkpoint. Here again, Flink needs to read from the remote storage (in
> your case GCS). Depending on the size of the checkpoint and the read
> bandwidth, this can be faster or slower. The best way to figure out what
> takes long is to share 

Re: Job Manager taking long time to upload job graph on remote storage

2020-09-02 Thread Till Rohrmann
The logs don't look suspicious. Could you maybe check what the write
bandwidth to your GCS bucket is from the machine you are running Flink on?
It should be enough to generate a 200 MB file and write it to GCS. Thanks a
lot for your help in debugging this matter.

Cheers,
Till

On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur  wrote:

> Hi,
>
> Thanks for the response. Yes, we are running Flink in HA mode. We checked
> there are no such quota limits for GCS for us. Please find the logs below,
> here you can see the copying of blob started at 11:50:39,455 and it
> got JobGraph submission at 11:50:46,400.
>
> 2020-09-01 11:50:37,061 DEBUG
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
> the idle timeout.
> 2020-09-01 11:50:37,061 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
> 2020-09-01 11:50:37,062 DEBUG
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
> the idle timeout.
> 2020-09-01 11:50:37,062 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
> 2020-09-01 11:50:37,305 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:37,305 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:37,354 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
> 2020-09-01 11:50:37,354 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
> 2020-09-01 11:50:39,455 DEBUG
> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying
> from
> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
> to
> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
> 2020-09-01 11:50:43,904 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x30be3d929102460 after 2ms
> 2020-09-01 11:50:46,400 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
> 2020-09-01 11:50:46,403 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting
> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
> 2020-09-01 11:50:46,405 DEBUG
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to
> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
> 2020-09-01 11:50:47,325 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:47,325 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:47,325 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:47,325 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:47,330 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
> 2020-09-01 11:50:47,331 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
> 2020-09-01 11:50:52,880 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> notification sessionid:0x30be3d929102460
> 2020-09-01 11:50:52,880 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> WatchedEvent state:SyncConnected type:NodeChildrenChanged
> path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
> 2020-09-01 11:50:52,882 INFO
>  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.
>
> Thank You.
>
> On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann  wrote:
>
>> Hi Prakhar,
>>
>> have you enabled HA for your cluster? If yes, then Flink will try to
>> store the job gra

Re: Job Manager taking long time to upload job graph on remote storage

2020-09-03 Thread Prakhar Mathur
We tried uploading the same blob from Job Manager k8s pod directly to GCS
using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s.
Thanks.

On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann  wrote:

> The logs don't look suspicious. Could you maybe check what the write
> bandwidth to your GCS bucket is from the machine you are running Flink on?
> It should be enough to generate a 200 MB file and write it to GCS. Thanks a
> lot for your help in debugging this matter.
>
> Cheers,
> Till
>
> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur  wrote:
>
>> Hi,
>>
>> Thanks for the response. Yes, we are running Flink in HA mode. We checked
>> there are no such quota limits for GCS for us. Please find the logs below,
>> here you can see the copying of blob started at 11:50:39,455 and it
>> got JobGraph submission at 11:50:46,400.
>>
>> 2020-09-01 11:50:37,061 DEBUG
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
>> the idle timeout.
>> 2020-09-01 11:50:37,061 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
>> 2020-09-01 11:50:37,062 DEBUG
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
>> the idle timeout.
>> 2020-09-01 11:50:37,062 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
>> 2020-09-01 11:50:37,305 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Trigger heartbeat request.
>> 2020-09-01 11:50:37,305 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Trigger heartbeat request.
>> 2020-09-01 11:50:37,354 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>> 2020-09-01 11:50:37,354 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>> 2020-09-01 11:50:39,455 DEBUG
>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying
>> from
>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
>> to
>> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
>> 2020-09-01 11:50:43,904 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x30be3d929102460 after 2ms
>> 2020-09-01 11:50:46,400 INFO
>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
>> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>> 2020-09-01 11:50:46,403 INFO
>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting
>> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>> 2020-09-01 11:50:46,405 DEBUG
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to
>> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
>> 2020-09-01 11:50:47,325 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Trigger heartbeat request.
>> 2020-09-01 11:50:47,325 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Trigger heartbeat request.
>> 2020-09-01 11:50:47,325 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Trigger heartbeat request.
>> 2020-09-01 11:50:47,325 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Trigger heartbeat request.
>> 2020-09-01 11:50:47,330 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>> 2020-09-01 11:50:47,331 DEBUG
>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>> 2020-09-01 11:50:52,880 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> notification sessionid:0x30be3d929102460
>> 2020-09-01 11:50:52,880 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCn

Re: Job Manager taking long time to upload job graph on remote storage

2020-09-03 Thread Till Rohrmann
Hmm then it probably rules GCS out. What about ZooKeeper? Have you
experienced slow response times from your ZooKeeper cluster?

Cheers,
Till

On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur  wrote:

> We tried uploading the same blob from Job Manager k8s pod directly to GCS
> using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s.
> Thanks.
>
> On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann  wrote:
>
>> The logs don't look suspicious. Could you maybe check what the write
>> bandwidth to your GCS bucket is from the machine you are running Flink on?
>> It should be enough to generate a 200 MB file and write it to GCS. Thanks a
>> lot for your help in debugging this matter.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur 
>> wrote:
>>
>>> Hi,
>>>
>>> Thanks for the response. Yes, we are running Flink in HA mode. We
>>> checked there are no such quota limits for GCS for us. Please find the logs
>>> below, here you can see the copying of blob started at 11:50:39,455 and it
>>> got JobGraph submission at 11:50:46,400.
>>>
>>> 2020-09-01 11:50:37,061 DEBUG
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
>>> the idle timeout.
>>> 2020-09-01 11:50:37,061 DEBUG
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
>>> 2020-09-01 11:50:37,062 DEBUG
>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
>>> the idle timeout.
>>> 2020-09-01 11:50:37,062 DEBUG
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
>>> 2020-09-01 11:50:37,305 DEBUG
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Trigger heartbeat request.
>>> 2020-09-01 11:50:37,305 DEBUG
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Trigger heartbeat request.
>>> 2020-09-01 11:50:37,354 DEBUG
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>> 2020-09-01 11:50:37,354 DEBUG
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>> 2020-09-01 11:50:39,455 DEBUG
>>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying
>>> from
>>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
>>> to
>>> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
>>> 2020-09-01 11:50:43,904 DEBUG
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>> ping response for sessionid: 0x30be3d929102460 after 2ms
>>> 2020-09-01 11:50:46,400 INFO
>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
>>> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>> 2020-09-01 11:50:46,403 INFO
>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting
>>> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>> 2020-09-01 11:50:46,405 DEBUG
>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to
>>> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
>>> 2020-09-01 11:50:47,325 DEBUG
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Trigger heartbeat request.
>>> 2020-09-01 11:50:47,325 DEBUG
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Trigger heartbeat request.
>>> 2020-09-01 11:50:47,325 DEBUG
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Trigger heartbeat request.
>>> 2020-09-01 11:50:47,325 DEBUG
>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Trigger heartbeat request.
>>> 2020-09-01 11:50:47,330 DEBUG
>>> org.apache.flink.runtime.resourcemanager.Stand

Re: Job Manager taking long time to upload job graph on remote storage

2020-09-03 Thread Prakhar Mathur
Yes, I will check that, but any pointers on why Flink is taking more time
than gsutil upload?

On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann  wrote:

> Hmm then it probably rules GCS out. What about ZooKeeper? Have you
> experienced slow response times from your ZooKeeper cluster?
>
> Cheers,
> Till
>
> On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur  wrote:
>
>> We tried uploading the same blob from Job Manager k8s pod directly to GCS
>> using gsutils and it took 2 seconds. The upload speed was 166.8 MiB/s.
>> Thanks.
>>
>> On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann 
>> wrote:
>>
>>> The logs don't look suspicious. Could you maybe check what the write
>>> bandwidth to your GCS bucket is from the machine you are running Flink on?
>>> It should be enough to generate a 200 MB file and write it to GCS. Thanks a
>>> lot for your help in debugging this matter.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Thanks for the response. Yes, we are running Flink in HA mode. We
>>>> checked there are no such quota limits for GCS for us. Please find the logs
>>>> below, here you can see the copying of blob started at 11:50:39,455 and it
>>>> got JobGraph submission at 11:50:46,400.
>>>>
>>>> 2020-09-01 11:50:37,061 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
>>>> the idle timeout.
>>>> 2020-09-01 11:50:37,061 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
>>>> 2020-09-01 11:50:37,062 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
>>>> the idle timeout.
>>>> 2020-09-01 11:50:37,062 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
>>>> 2020-09-01 11:50:37,305 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Trigger heartbeat request.
>>>> 2020-09-01 11:50:37,305 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Trigger heartbeat request.
>>>> 2020-09-01 11:50:37,354 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>>> 2020-09-01 11:50:37,354 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>>> 2020-09-01 11:50:39,455 DEBUG
>>>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying
>>>> from
>>>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
>>>> to
>>>> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
>>>> 2020-09-01 11:50:43,904 DEBUG
>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>>> ping response for sessionid: 0x30be3d929102460 after 2ms
>>>> 2020-09-01 11:50:46,400 INFO
>>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
>>>> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>>> 2020-09-01 11:50:46,403 INFO
>>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting
>>>> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>>> 2020-09-01 11:50:46,405 DEBUG
>>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>>> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to
>>>> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
>>>> 2020-09-01 11:50:47,325 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Trigger heartbeat request.
>>>> 2020-09-01 11:50:47,325 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
&

Re: Job Manager taking long time to upload job graph on remote storage

2020-09-04 Thread Till Rohrmann
>From the log snippet it is hard to tell. Flink is not only interacting with
GCS but also with ZooKeeper to store a pointer to the serialized JobGraph.
This can also take some time. Then of course, there could be an issue with
the GS filesystem implementation you are using. The fs throughput could
also change a bit with time. In the logs you shared, uploading of the blobs
takes at most 7s. In another run you stated that it would take 10s.

Have you tried whether the same behaviour is observable with the latest
Flink version?

Cheers,
Till

On Fri, Sep 4, 2020 at 6:44 AM Prakhar Mathur  wrote:

> Yes, I will check that, but any pointers on why Flink is taking more time
> than gsutil upload?
>
> On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann 
> wrote:
>
>> Hmm then it probably rules GCS out. What about ZooKeeper? Have you
>> experienced slow response times from your ZooKeeper cluster?
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur 
>> wrote:
>>
>>> We tried uploading the same blob from Job Manager k8s pod directly to
>>> GCS using gsutils and it took 2 seconds. The upload speed was 166.8
>>> MiB/s. Thanks.
>>>
>>> On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann 
>>> wrote:
>>>
>>>> The logs don't look suspicious. Could you maybe check what the write
>>>> bandwidth to your GCS bucket is from the machine you are running Flink on?
>>>> It should be enough to generate a 200 MB file and write it to GCS. Thanks a
>>>> lot for your help in debugging this matter.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thanks for the response. Yes, we are running Flink in HA mode. We
>>>>> checked there are no such quota limits for GCS for us. Please find the 
>>>>> logs
>>>>> below, here you can see the copying of blob started at 11:50:39,455 and it
>>>>> got JobGraph submission at 11:50:46,400.
>>>>>
>>>>> 2020-09-01 11:50:37,061 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
>>>>> the idle timeout.
>>>>> 2020-09-01 11:50:37,061 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
>>>>> 2020-09-01 11:50:37,062 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
>>>>> the idle timeout.
>>>>> 2020-09-01 11:50:37,062 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
>>>>> 2020-09-01 11:50:37,305 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Trigger heartbeat request.
>>>>> 2020-09-01 11:50:37,305 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Trigger heartbeat request.
>>>>> 2020-09-01 11:50:37,354 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>>>> 2020-09-01 11:50:37,354 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>>>> 2020-09-01 11:50:39,455 DEBUG
>>>>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying
>>>>> from
>>>>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
>>>>> to
>>>>> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
>>>>> 2020-09-01 11:50:43,904 DEBUG
>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>>>> ping response for sessionid: 0x30be3d929102460 after 2ms
>>>>> 2020-09-01 11:50:46,400 INFO
>>>>>  org.apache.flin

Re: Job Manager taking long time to upload job graph on remote storage

2020-09-04 Thread Prakhar Mathur
Yes, we can try the same in 1.11. Meanwhile is there any network or threads
related config that we can tweak for this?

On Fri, Sep 4, 2020 at 12:48 PM Till Rohrmann  wrote:

> From the log snippet it is hard to tell. Flink is not only interacting
> with GCS but also with ZooKeeper to store a pointer to the serialized
> JobGraph. This can also take some time. Then of course, there could be an
> issue with the GS filesystem implementation you are using. The fs
> throughput could also change a bit with time. In the logs you shared,
> uploading of the blobs takes at most 7s. In another run you stated that it
> would take 10s.
>
> Have you tried whether the same behaviour is observable with the latest
> Flink version?
>
> Cheers,
> Till
>
> On Fri, Sep 4, 2020 at 6:44 AM Prakhar Mathur  wrote:
>
>> Yes, I will check that, but any pointers on why Flink is taking more time
>> than gsutil upload?
>>
>> On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann 
>> wrote:
>>
>>> Hmm then it probably rules GCS out. What about ZooKeeper? Have you
>>> experienced slow response times from your ZooKeeper cluster?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur 
>>> wrote:
>>>
>>>> We tried uploading the same blob from Job Manager k8s pod directly to
>>>> GCS using gsutils and it took 2 seconds. The upload speed was 166.8
>>>> MiB/s. Thanks.
>>>>
>>>> On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann 
>>>> wrote:
>>>>
>>>>> The logs don't look suspicious. Could you maybe check what the write
>>>>> bandwidth to your GCS bucket is from the machine you are running Flink on?
>>>>> It should be enough to generate a 200 MB file and write it to GCS. Thanks 
>>>>> a
>>>>> lot for your help in debugging this matter.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thanks for the response. Yes, we are running Flink in HA mode. We
>>>>>> checked there are no such quota limits for GCS for us. Please find the 
>>>>>> logs
>>>>>> below, here you can see the copying of blob started at 11:50:39,455 and 
>>>>>> it
>>>>>> got JobGraph submission at 11:50:46,400.
>>>>>>
>>>>>> 2020-09-01 11:50:37,061 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>>>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
>>>>>> the idle timeout.
>>>>>> 2020-09-01 11:50:37,061 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
>>>>>> 2020-09-01 11:50:37,062 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>>>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
>>>>>> the idle timeout.
>>>>>> 2020-09-01 11:50:37,062 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
>>>>>> 2020-09-01 11:50:37,305 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Trigger heartbeat request.
>>>>>> 2020-09-01 11:50:37,305 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Trigger heartbeat request.
>>>>>> 2020-09-01 11:50:37,354 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>>>>> 2020-09-01 11:50:37,354 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>>>>> 2020-09-01 11:50:39,455 DEBUG
>>>>>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying
>>>>>> from
>>>>>> /tmp/flink-blobs/blobStore-6e4

Re: Job Manager taking long time to upload job graph on remote storage

2020-09-04 Thread Till Rohrmann
me.resourcemanager.StandaloneResourceManager  -
>>>>>>> Trigger heartbeat request.
>>>>>>> 2020-09-01 11:50:37,354 DEBUG
>>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>>>>>> 2020-09-01 11:50:37,354 DEBUG
>>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>>>>>> 2020-09-01 11:50:39,455 DEBUG
>>>>>>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying
>>>>>>> from
>>>>>>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
>>>>>>> to
>>>>>>> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
>>>>>>> 2020-09-01 11:50:43,904 DEBUG
>>>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>>>>>> ping response for sessionid: 0x30be3d929102460 after 2ms
>>>>>>> 2020-09-01 11:50:46,400 INFO
>>>>>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - 
>>>>>>> Received
>>>>>>> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>>>>>> 2020-09-01 11:50:46,403 INFO
>>>>>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - 
>>>>>>> Submitting
>>>>>>> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>>>>>> 2020-09-01 11:50:46,405 DEBUG
>>>>>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>>>>>> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to
>>>>>>> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
>>>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Trigger heartbeat request.
>>>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Trigger heartbeat request.
>>>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Trigger heartbeat request.
>>>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Trigger heartbeat request.
>>>>>>> 2020-09-01 11:50:47,330 DEBUG
>>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>>>>>> 2020-09-01 11:50:47,331 DEBUG
>>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>>>>>> 2020-09-01 11:50:52,880 DEBUG
>>>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>>>>>> notification sessionid:0x30be3d929102460
>>>>>>> 2020-09-01 11:50:52,880 DEBUG
>>>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>>>>>> WatchedEvent state:SyncConnected type:NodeChildrenChanged
>>>>>>> path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
>>>>>>> 2020-09-01 11:50:52,882 INFO
>>>>>>>  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>>>>>> Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.
>>>>>>>
>>>>>>> Thank You.
>>>>>>>
>>>>>>> On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Prakhar,
>>>>>>>>
>>>>>>>> have you enabled HA for your cluster? If yes, then Flink will try
>>>>>>>> to store the job graph to the configured high-availability.storageDir 
>>>>>>>> in
>>>>>>>> order to be able to recover it. If this operation takes long, then it 
>>>>>>>> is
>>>>>>>> either the filesystem which is slow or storing the pointer in 
>>>>>>>> ZooKeeper. If
>>>>>>>> it is the filesystem, then I would suggest to check whether you have 
>>>>>>>> some
>>>>>>>> read/write quotas which might slow the operation down.
>>>>>>>>
>>>>>>>> If you haven't enabled HA or persisting the jobGraph is not what
>>>>>>>> takes long, then the next most likely candidate is the recovery from a
>>>>>>>> previous checkpoint. Here again, Flink needs to read from the remote
>>>>>>>> storage (in your case GCS). Depending on the size of the checkpoint 
>>>>>>>> and the
>>>>>>>> read bandwidth, this can be faster or slower. The best way to figure 
>>>>>>>> out
>>>>>>>> what takes long is to share the logs with us so that we can confirm 
>>>>>>>> what
>>>>>>>> takes long.
>>>>>>>>
>>>>>>>> To sum it up, the job submission is most likely slow because of the
>>>>>>>> interplay of Flink with the external system (most likely your 
>>>>>>>> configured
>>>>>>>> filesystem). If the filesystem is somewhat throttled, then Flink 
>>>>>>>> cannot do
>>>>>>>> much about it.
>>>>>>>>
>>>>>>>> What you could try to do is to check whether your jar contains
>>>>>>>> dependencies which are not needed (e.g. Flink dependencies which are
>>>>>>>> usually provided by the system). That way you could decrease the size 
>>>>>>>> of
>>>>>>>> the jar a bit.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> We are currently running Flink 1.9.0. We see a delay of around 20
>>>>>>>>> seconds in order to start a job on a session Flink cluster. We start 
>>>>>>>>> the
>>>>>>>>> job using Flink's monitoring REST API where our jar is already 
>>>>>>>>> uploaded on
>>>>>>>>> Job Manager. Our jar file size is around 200 MB. We are using memory 
>>>>>>>>> state
>>>>>>>>> backend having GCS as remote storage.
>>>>>>>>>
>>>>>>>>> On running the cluster in debug mode, we observed that generating
>>>>>>>>> the plan itself takes around 6 seconds and copying job graph from 
>>>>>>>>> local to
>>>>>>>>> the remote folder takes around 10 seconds.
>>>>>>>>>
>>>>>>>>> We were wondering whether this delay is expected or if it can be
>>>>>>>>> reduced via tweaking any configuration?
>>>>>>>>>
>>>>>>>>> Thank you. Regards
>>>>>>>>> Prakhar Mathur
>>>>>>>>>
>>>>>>>>


Re: Bugs in Streaming job stopping? Weird graceful stop/restart for disjoint job graph

2021-01-15 Thread Arvid Heise
Hi Theo,

thank you for reporting. I think all three issues are indeed bugs and it
would be awesome if you could create tickets for them. For the restarting
issue, could you please add whether the savepoint created successfully?

Best,

Arvid

On Wed, Jan 13, 2021 at 12:56 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi there,
>
> I'm currently analyzing a weird behavior of one of our jobs running on
> YARN with Flink 1.11.2. I have a kind of special situation here in that
> regard that I submit a single streaming job with a disjoint job graph, i.e.
> that job contains two graphs of the same kind but totally independent of
> each other (one having an ETL pipeline for source1, another for source2).
> It's just for historical reasons that makes deployment a bit easier.
>
> I had the job running nicely until I wanted to stop it with a savepoint as
> usual like so:
>   flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS
> --yarnapplicationId=${FLINK_YARN_APPID} ${ID}
>
> After a minute, I receiveid a timeout exception [1].
>
> Now the interesting part/possible bugs are the following 3:
> 1. The stop was triggered at 21/01/12 06:15:10. I see that almost all
> tasks of the entire job switched from RUNNING to FINISHED within seconds,
> but two tasks had something that looks like a racecondition on shutdown.
> They threw an IllegalStateException in BufferBuilder.append where an
> assertion makes sure that the buffer is not yet finished. [2]
> 2. That failure lead to RESTARTING the tasks of that job. So the failure
> occured 5 seconds after I triggered to stop the job. And 2 seconds later, I
> see that the pipeline switched it's state to RUNNING again. No wonder that
> the "stop" eventually stopped with a Timeout as the Job didn't think about
> shutting down anymore.
> 3. BUT the major issue for me here is: The entire pipeline of source1 was
> restarted, but the pipeline of source2 was still FINISHED. As Fink did
> quite some stuff with Batch/Streaming unification and region
> failover/restart in the last versions,  my guess is that as I am in the
> special case of a disjoint graph here, only the tasks in the connected
> graph where the error occured restarted properly and the other graph was
> left in FINISHED state, even though I am dealing with a streaming job here.
>
> The problem is that the job was left in kind of a  broken state: From just
> watching at YARN / Flink UI it seemed to be still running and the stop had
> no effect, but in reality, it shut down a huge part of the job. My
> workaround of course is as following:
> 1. If a "graceful stop" won't succeed, in future I will trigger a hard
> kill "yarn application -kill" afterwards because I can't be certain in what
> state the job is after a failed attempt to stop.
> 2. I will enforce stronger isolation in my jobs so that I always have
> connected graphs as jobs. In my case: I will deploy two independent jobs
> for the two ETL pipelines and hope that this problem won't arise again (At
> least, have the entire job either FINISHED or RUNNING).
>
> But I'm curious what you think: Are those 3 bugs or (some of it) kind of
> expected behaviour? Should I open bug ticket(s) for those?
>
> Best regards
> Theo
>
>
>
> [1] Timeout from flink stop:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "f23290bf5fb0ecd49a4455e4a65f2eb6".
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
>at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
>at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
>at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.util.concurrent.TimeoutException
>at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
>... 9 more
>
> [2] Exception in graceful s

Re: Is there a way to control the parallelism of auto-generated Flink operators of the FlinkSQL job graph?

2023-03-24 Thread Hang Ruan
Hi, Elkhan,

I think this is an intended behavior. If the parallelism of an operator is
not specified, it will be the same as the previous one instead of the
default parallelism.
Actually the table planner will help us to do most jobs. There should not
be a way to modify the parallelism for every operator. After all we don't
know what operators will be contained when we write the sql.

Best,
Hang

Elkhan Dadashov  于2023年3月24日周五 14:14写道:

> Checking with the community again, if anyone explored this before.
>
> Thanks.
>
>
> On Fri, Mar 17, 2023 at 1:56 PM Elkhan Dadashov  >
> wrote:
>
> > Dear Flink developers,
> >
> > Wanted to check, if there is a way to control the parallelism of
> > auto-generated Flink operators of the FlinkSQL job graph?
> >
> > In Java API, it is possible to have full control of the parallelism of
> > each operator.
> >
> > On FlinkSQL some source and sink connectors support `source.parallelism`
> > and `sink.parallelism`, and the rest can be set via
> `default.parallelism`.
> >
> > In this particular scenario, enchancedEvents gets chained to the
> > KafkaSource operator, it can be separated by calling disableChain() on
> > KafkaSource  stream on Kafka connector side, but even with disabled
> > chaining on the source stream, `enhancedEvents` operator parallelism is
> > still set to 5 (same as Kafka Source operator parallelism), instead of 3
> > (which is default parallelism) :
> >
> > ```sql
> > SET 'parallelism.default' = '3';
> >
> > CREATE TABLE input_kafka_table
> > (
> > ...
> > ts AS TO_TIMESTAMP_LTZ(CAST(`timestamp` AS BIGINT),3),
> > WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE
> > ) WITH (
> > 'connector' = 'kafka',
> > 'source.parallelism' = '5' // this is supported by cutomization of
> > kafka connector
> > ...
> > );
> >
> > CREATE TEMPORARY VIEW enhancedEvents AS (
> >  SELECT x, y
> >  FROM input_kafka_table, LATERAL TABLE(udf.doIt(x, y)
> > );
> >
> > CREATE TABLE other_table_source (...) WITH(...);
> > CREATE TABLE other_table_sink (...) WITH(...);
> >
> > BEGIN STATEMENT SET;
> >  INSERT into enhancedEventsSink (Select * from enhancedEvents);
> >  INSERT into other_table_sink (Select z from other_table_source );
> > END;
> > ```
> >
> > Is there a way to force override parallelism of auto-generated operators
> > for FlinkSQL pipeline?
> >
> > Or is this expected behavior of some operator's parallelism not assigned
> > from default parallelism but from another operator's parallelism?
> >
> > Want to understand if this is a bug or intended behavior.
> >
> > Thank you.
> >
> >
>