Re: Reserving Kafka offset in Flink after modifying app

2019-04-03 Thread shengjk1
Mapbe this page can help you
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/savepoints.html


Best,
Shengjk1




On 03/26/2019 09:51,Son Mai wrote:
Hi Konstantin, 


Thanks for the response. What still concerned me is:
Am I able to recover from  checkpoints even if I change my program (for 
example, changing Filter and Map functions, data objects, ..) ? I was not able 
to recover from savepoints when I changed my program.


On Mon, Mar 25, 2019 at 3:19 PM Konstantin Knauf  
wrote:

Hi Son,



yes, this is possible, but your sink needs to play its part in Flink's 
checkpointing mechanism. Depending on the implementation of the sink you should 
either:



* implemented CheckpointedFunction and flush all records to BigQuery in 
snapshotState. This way in case of a failure/restart of the job, all records up 
to the last successful checkpoint will have been written to BigQuery and all 
other records will be replayed.
* use managed operator state to store all pending records in the sink. Thereby 
they will be be persisted in snapshotState. This way in  case of a 
failure/restart of the job, all records up to the last successful checkpoint, 
which have not been written to BigQuery, will be restored in the sink, all 
other records will be replayed.


In both cases, you might write the same record to the BigQuery twice.



If in doubt if your sink fulfills the criteria above, feel free to share it.


Cheers,



Konstantin







On Mon, Mar 25, 2019 at 7:50 AM Son Mai  wrote:

Hello,


I have a topic in Kafka that Flink reads from. I parse and write messages in 
this topic to BigQuery using streaming insert in batch of 500 messages using in 
CountWindow in Flink.


Problem: I want to commit manually only when a batch was written successfully 
to Bigquery.


Reason:
I saw that Flink KafkaConsumer does not use offset committing to Kafka but uses 
its own checkpointing. I don't know how Flink checkpointing works and I'm 
worried that Flink's checkpointing does not solve my following situation:
- let's say I have a Flink job running and processing a batch of 500 messages 
of Kafka offset 1000-1500. 
- I stopped this job before it saves to BigQuery and makes some modifications 
to the program. Savepoints did not work when I tried because it required the 
operators code does not change.


What I want is when I start the modified app, it would start every time from 
offset 1000-1500 in Kafka because these messages have not been written to 
BigQuery.


Is there any way to achieve this in Flink? 


Thanks,
SM


--


Konstantin Knauf| Solutions Architect

+49 160 91394525




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   

Re: Reserving Kafka offset in Flink after modifying app

2019-03-25 Thread Yun Tang
Hi Son

I think it might be because of not assigning operator ids to your Filter and 
Map functions, you could refer to [1] to assign ids to your application. 
Moreover, if you have ever removed some operators, please consider to add 
--allowNonRestoredState [2] option.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang


From: Son Mai 
Sent: Tuesday, March 26, 2019 9:51
To: Konstantin Knauf
Cc: user
Subject: Re: Reserving Kafka offset in Flink after modifying app

Hi Konstantin,

Thanks for the response. What still concerned me is:

  1.  Am I able to recover from  checkpoints even if I change my program (for 
example, changing Filter and Map functions, data objects, ..) ? I was not able 
to recover from savepoints when I changed my program.

On Mon, Mar 25, 2019 at 3:19 PM Konstantin Knauf 
mailto:konstan...@ververica.com>> wrote:
Hi Son,

yes, this is possible, but your sink needs to play its part in Flink's 
checkpointing mechanism. Depending on the implementation of the sink you should 
either:

* implemented CheckpointedFunction and flush all records to BigQuery in 
snapshotState. This way in case of a failure/restart of the job, all records up 
to the last successful checkpoint will have been written to BigQuery and all 
other records will be replayed.
* use managed operator state to store all pending records in the sink. Thereby 
they will be be persisted in snapshotState. This way in  case of a 
failure/restart of the job, all records up to the last successful checkpoint, 
which have not been written to BigQuery, will be restored in the sink, all 
other records will be replayed.

In both cases, you might write the same record to the BigQuery twice.

If in doubt if your sink fulfills the criteria above, feel free to share it.

Cheers,

Konstantin



On Mon, Mar 25, 2019 at 7:50 AM Son Mai 
mailto:hongson1...@gmail.com>> wrote:
Hello,

I have a topic in Kafka that Flink reads from. I parse and write messages in 
this topic to BigQuery using streaming insert in batch of 500 messages using in 
CountWindow in Flink.

Problem: I want to commit manually only when a batch was written successfully 
to Bigquery.

Reason:
I saw that Flink KafkaConsumer does not use offset committing to Kafka but uses 
its own checkpointing. I don't know how Flink checkpointing works and I'm 
worried that Flink's checkpointing does not solve my following situation:
- let's say I have a Flink job running and processing a batch of 500 messages 
of Kafka offset 1000-1500.
- I stopped this job before it saves to BigQuery and makes some modifications 
to the program. Savepoints did not work when I tried because it required the 
operators code does not change.

What I want is when I start the modified app, it would start every time from 
offset 1000-1500 in Kafka because these messages have not been written to 
BigQuery.

Is there any way to achieve this in Flink?

Thanks,
SM


--

Konstantin Knauf | Solutions Architect

+49 160 91394525

[https://lh4.googleusercontent.com/1RRzA12SK12Xaowkag-W37QDs5LHrfw4R0tMwVNjKLDKoIu69ld1qtA2hSDn1LSJe9w2THG1A9igK_nXPrNeIqRF87FjbEQoBnZJJgyPXCkKPFYuYc_Vh419P9EOO36ERgdnX5wG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: Reserving Kafka offset in Flink after modifying app

2019-03-25 Thread Son Mai
Hi Konstantin,

Thanks for the response. What still concerned me is:

   1. Am I able to recover from  checkpoints even if I change my program
   (for example, changing Filter and Map functions, data objects, ..) ? I was
   not able to recover from savepoints when I changed my program.


On Mon, Mar 25, 2019 at 3:19 PM Konstantin Knauf 
wrote:

> Hi Son,
>
> yes, this is possible, but your sink needs to play its part in Flink's
> checkpointing mechanism. Depending on the implementation of the sink you
> should either:
>
> * implemented *CheckpointedFunction *and flush all records to BigQuery in
> *snapshotState*. This way in case of a failure/restart of the job, all
> records up to the last successful checkpoint will have been written to
> BigQuery and all other records will be replayed.
> * use managed operator state to store all pending records in the sink.
> Thereby they will be be persisted in *snapshotState*. This way in  case
> of a failure/restart of the job, all records up to the last successful
> checkpoint, which have not been written to BigQuery, will be restored in
> the sink, all other records will be replayed.
>
> In both cases, you might write the same record to the BigQuery twice.
>
> If in doubt if your sink fulfills the criteria above, feel free to share
> it.
>
> Cheers,
>
> Konstantin
>
>
>
> On Mon, Mar 25, 2019 at 7:50 AM Son Mai  wrote:
>
>> Hello,
>>
>> I have a topic in Kafka that Flink reads from. I parse and write messages
>> in this topic to BigQuery using streaming insert in batch of 500 messages
>> using in CountWindow in Flink.
>>
>> *Problem*: I want to commit manually only when a batch was written
>> successfully to Bigquery.
>>
>> *Reason:*
>> I saw that Flink KafkaConsumer does not use offset committing to Kafka
>> but uses its own checkpointing. I don't know how Flink checkpointing works
>> and I'm worried that Flink's checkpointing does not solve my following
>> situation:
>> - let's say I have a Flink job running and processing a batch of 500
>> messages of Kafka offset 1000-1500.
>> - I stopped this job before it saves to BigQuery and makes some
>> modifications to the program. Savepoints did not work when I tried because
>> it required the operators code does not change.
>>
>> What I want is when I start the modified app, it would start every time
>> from offset 1000-1500 in Kafka because these messages have not been written
>> to BigQuery.
>>
>> Is there any way to achieve this in Flink?
>>
>> Thanks,
>> SM
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


Re: Reserving Kafka offset in Flink after modifying app

2019-03-25 Thread Konstantin Knauf
Hi Son,

yes, this is possible, but your sink needs to play its part in Flink's
checkpointing mechanism. Depending on the implementation of the sink you
should either:

* implemented *CheckpointedFunction *and flush all records to BigQuery in
*snapshotState*. This way in case of a failure/restart of the job, all
records up to the last successful checkpoint will have been written to
BigQuery and all other records will be replayed.
* use managed operator state to store all pending records in the sink.
Thereby they will be be persisted in *snapshotState*. This way in  case of
a failure/restart of the job, all records up to the last successful
checkpoint, which have not been written to BigQuery, will be restored in
the sink, all other records will be replayed.

In both cases, you might write the same record to the BigQuery twice.

If in doubt if your sink fulfills the criteria above, feel free to share it.

Cheers,

Konstantin



On Mon, Mar 25, 2019 at 7:50 AM Son Mai  wrote:

> Hello,
>
> I have a topic in Kafka that Flink reads from. I parse and write messages
> in this topic to BigQuery using streaming insert in batch of 500 messages
> using in CountWindow in Flink.
>
> *Problem*: I want to commit manually only when a batch was written
> successfully to Bigquery.
>
> *Reason:*
> I saw that Flink KafkaConsumer does not use offset committing to Kafka but
> uses its own checkpointing. I don't know how Flink checkpointing works and
> I'm worried that Flink's checkpointing does not solve my following
> situation:
> - let's say I have a Flink job running and processing a batch of 500
> messages of Kafka offset 1000-1500.
> - I stopped this job before it saves to BigQuery and makes some
> modifications to the program. Savepoints did not work when I tried because
> it required the operators code does not change.
>
> What I want is when I start the modified app, it would start every time
> from offset 1000-1500 in Kafka because these messages have not been written
> to BigQuery.
>
> Is there any way to achieve this in Flink?
>
> Thanks,
> SM
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Reserving Kafka offset in Flink after modifying app

2019-03-24 Thread Son Mai
Hello,

I have a topic in Kafka that Flink reads from. I parse and write messages
in this topic to BigQuery using streaming insert in batch of 500 messages
using in CountWindow in Flink.

*Problem*: I want to commit manually only when a batch was written
successfully to Bigquery.

*Reason:*
I saw that Flink KafkaConsumer does not use offset committing to Kafka but
uses its own checkpointing. I don't know how Flink checkpointing works and
I'm worried that Flink's checkpointing does not solve my following
situation:
- let's say I have a Flink job running and processing a batch of 500
messages of Kafka offset 1000-1500.
- I stopped this job before it saves to BigQuery and makes some
modifications to the program. Savepoints did not work when I tried because
it required the operators code does not change.

What I want is when I start the modified app, it would start every time
from offset 1000-1500 in Kafka because these messages have not been written
to BigQuery.

Is there any way to achieve this in Flink?

Thanks,
SM