Re: Stopping flink application with /jobs/:jobid/savepoints or /jobs/:jobid/stop

2020-06-15 Thread Robert Metzger
Hi,

as Kostas said, draining just influences the watermarks Flink sends through
your streaming topology. Maybe the watermarks you are sending yourself
through the topology cause the state to be drained?
I also don't know what the Beam API is doing underneath. Maybe it makes
sense to check the Beam code (or ask on the ML) to see if something is
happening there.

Just to understand the problem better: What's your definition of "draining
the state"? How are you measuring / observing that?
How does Flink's behavior deviate from your expectations?

Best,
Robert


On Tue, Jun 9, 2020 at 7:01 PM Deshpande, Omkar 
wrote:

> I have observed that state gets drained irrespective of the value of the
> "drain".
>
> I am using -
> org.apache.beam
> beam-runners-flink-1.9
> 2.19.0
>
> And I am running a kafka wordcount app with fixed window of 1 hour and
> when I stop the app with the stop endpoint
> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-stop>
> before 1 hour, the records get drained. I have tried with {"drain":true}
> and {"drain":false} in the body of the POST request. The drain behavior
> remains the same.
>
> --
> *From:* Kostas Kloudas 
> *Sent:* Tuesday, June 9, 2020 4:48 AM
> *To:* Deshpande, Omkar 
> *Cc:* user@flink.apache.org ; Hwang, Nick <
> nicholas_hw...@intuit.com>; Benenson, Mikhail ;
> LeVeck, Matt ; Kathula, Sandeep <
> sandeep_kath...@intuit.com>
> *Subject:* Re: Stopping flink application with /jobs/:jobid/savepoints or
> /jobs/:jobid/stop
>
> This email is from an external sender.
>
>
> Hi Omkar,
>
> For the first part of the question where you set the "drain" to false
> and the state gets drained, this can be an issue on our side. Just to
> clarify, no matter what is the value of the "drain", Flink always
> takes a savepoint. Drain simply means that we also send MAX_WATERMARK
> before taking the savepoint. Is this what you observe? I.e. that you
> have an infinite input stream and even if you set drain to false, you
> still see the MAX_WATERMARK?
>
> For the second part of the question, the cancel-with-savepoint is a
> deprecated command. But it is not removed for backwards compatibility.
> So you can still have a cancel-with-savepoint in the way you
> described. The difference between the deprecated cancel-with-savepoint
> and the recommended stop-with-savepoint is that the
> stop-with-savepoint guarantees that if you are using an exactly-once
> sink, the side-effects are going to be committed to the sink before
> the job exits. This was not the case for cancel-with-savepoint. For
> more details, you can have a look at [1].
>
> Cheers,
> Kostas
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212
>
> On Tue, Jun 9, 2020 at 4:40 AM Deshpande, Omkar
>  wrote:
> >
> > Hello,
> >
> > When I try to stop the job with /jobs/:jobid/stop REST endpoint, the
> state gets drained, even if I pass {"drain":false} in the body of the post
> request. Is the value of drain flag true by default? Why is not getting
> used when I pass {"drain":false}?
> >
> > And I can also stop a job using this endpoint /jobs/:jobid/savepoints
> with {"cancel-job":"true"} in the body. In this case there the state is not
> drained. What is the difference between these 2 endpoints? Is there a
> reason to use one over the other?
> >
> > If I want to stop a job with savepoint but without draining the state
> which endpoint should be used?
> >
> > Omkar
>


Re: Stopping flink application with /jobs/:jobid/savepoints or /jobs/:jobid/stop

2020-06-09 Thread Deshpande, Omkar
I have observed that state gets drained irrespective of the value of the 
"drain".

I am using -
org.apache.beam
beam-runners-flink-1.9
2.19.0

And I am running a kafka wordcount app with fixed window of 1 hour and when I 
stop the app with the stop 
endpoint<https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-stop>
 before 1 hour, the records get drained. I have tried with {"drain":true} and 
{"drain":false} in the body of the POST request. The drain behavior remains the 
same.


From: Kostas Kloudas 
Sent: Tuesday, June 9, 2020 4:48 AM
To: Deshpande, Omkar 
Cc: user@flink.apache.org ; Hwang, Nick 
; Benenson, Mikhail ; 
LeVeck, Matt ; Kathula, Sandeep 

Subject: Re: Stopping flink application with /jobs/:jobid/savepoints or 
/jobs/:jobid/stop

This email is from an external sender.


Hi Omkar,

For the first part of the question where you set the "drain" to false
and the state gets drained, this can be an issue on our side. Just to
clarify, no matter what is the value of the "drain", Flink always
takes a savepoint. Drain simply means that we also send MAX_WATERMARK
before taking the savepoint. Is this what you observe? I.e. that you
have an infinite input stream and even if you set drain to false, you
still see the MAX_WATERMARK?

For the second part of the question, the cancel-with-savepoint is a
deprecated command. But it is not removed for backwards compatibility.
So you can still have a cancel-with-savepoint in the way you
described. The difference between the deprecated cancel-with-savepoint
and the recommended stop-with-savepoint is that the
stop-with-savepoint guarantees that if you are using an exactly-once
sink, the side-effects are going to be committed to the sink before
the job exits. This was not the case for cancel-with-savepoint. For
more details, you can have a look at [1].

Cheers,
Kostas

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212

On Tue, Jun 9, 2020 at 4:40 AM Deshpande, Omkar
 wrote:
>
> Hello,
>
> When I try to stop the job with /jobs/:jobid/stop REST endpoint, the state 
> gets drained, even if I pass {"drain":false} in the body of the post request. 
> Is the value of drain flag true by default? Why is not getting used when I 
> pass {"drain":false}?
>
> And I can also stop a job using this endpoint /jobs/:jobid/savepoints with 
> {"cancel-job":"true"} in the body. In this case there the state is not 
> drained. What is the difference between these 2 endpoints? Is there a reason 
> to use one over the other?
>
> If I want to stop a job with savepoint but without draining the state which 
> endpoint should be used?
>
> Omkar


Re: Stopping flink application with /jobs/:jobid/savepoints or /jobs/:jobid/stop

2020-06-09 Thread Kostas Kloudas
Hi Omkar,

For the first part of the question where you set the "drain" to false
and the state gets drained, this can be an issue on our side. Just to
clarify, no matter what is the value of the "drain", Flink always
takes a savepoint. Drain simply means that we also send MAX_WATERMARK
before taking the savepoint. Is this what you observe? I.e. that you
have an infinite input stream and even if you set drain to false, you
still see the MAX_WATERMARK?

For the second part of the question, the cancel-with-savepoint is a
deprecated command. But it is not removed for backwards compatibility.
So you can still have a cancel-with-savepoint in the way you
described. The difference between the deprecated cancel-with-savepoint
and the recommended stop-with-savepoint is that the
stop-with-savepoint guarantees that if you are using an exactly-once
sink, the side-effects are going to be committed to the sink before
the job exits. This was not the case for cancel-with-savepoint. For
more details, you can have a look at [1].

Cheers,
Kostas

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212

On Tue, Jun 9, 2020 at 4:40 AM Deshpande, Omkar
 wrote:
>
> Hello,
>
> When I try to stop the job with /jobs/:jobid/stop REST endpoint, the state 
> gets drained, even if I pass {"drain":false} in the body of the post request. 
> Is the value of drain flag true by default? Why is not getting used when I 
> pass {"drain":false}?
>
> And I can also stop a job using this endpoint /jobs/:jobid/savepoints with 
> {"cancel-job":"true"} in the body. In this case there the state is not 
> drained. What is the difference between these 2 endpoints? Is there a reason 
> to use one over the other?
>
> If I want to stop a job with savepoint but without draining the state which 
> endpoint should be used?
>
> Omkar


Stopping flink application with /jobs/:jobid/savepoints or /jobs/:jobid/stop

2020-06-09 Thread Deshpande, Omkar
Hello,

When I try to stop the job with /jobs/:jobid/stop REST 
endpoint,
 the state gets drained, even if I pass {"drain":false} in the body of the post 
request. Is the value of drain flag true by default? Why is not getting used 
when I pass {"drain":false}?

And I can also stop a job using 
this
 endpoint /jobs/:jobid/savepoints with {"cancel-job":"true"} in the body. In 
this case there the state is not drained. What is the difference between these 
2 endpoints? Is there a reason to use one over the other?

If I want to stop a job with savepoint but without draining the state which 
endpoint should be used?

Omkar