Hi,

Regarding the JDBC and Two-Phase commit (2PC) protocol. As Fabian mentioned it 
is not supported by the JDBC standard out of the box. With some workarounds I 
guess you could make it work by for example following one of the ideas:

1. Write records using JDBC with at-least-once semantics, by flushing the 
records on the checkpoint and then deduplicate the records, for example by 
defining some primary key in the table. Thanks to that you would get 
effectively-once semantic.

2. Use some kind of staging table. During the writing phase of 2PC, write 
records to the staging table with some checkpoint/snapshot id. On pre-commit in 
2PC make sure those are flushed. During commit phase of 2PC just re-write 
records from the staging table to the target/final one. You would need the 
checkpoint/snapshot it column if you want to handle more than one on-going 
checkpoint.

3. Modify the schema of your target table so that you can identify the 
individual records (like primary key) and add extra column “COMMIT_STATE”, with 
type of enum:
{PENDING, PRE-COMMITTED, COMMITTED, ABORTED} and use this column both in 2PC 
and when reading from the table (generally to ensure exactly-once semantic you 
must only read COMMITTED records and ignore the rest).

4. ???

Piotrek

> On 8 Apr 2019, at 12:16, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Patrick,
> 
> In general, you could also implement the 2PC logic in a regular operator. It 
> does not have to be a sink.
> You would need to add the logic of TwoPhaseCommitSinkFunction to your 
> operator.
> However, the TwoPhaseCommitSinkFunction does not work well with JDBC. The 
> problem is that you would need to recover / reopen a transaction after 
> recovery. I don't think that is possible with JDBC.
> There might be workarounds as writing to a separate table and atomically 
> moving all records to the actual table for a commit, but this would be a bit 
> of custom code.
> The GenericWriteAheadSink does not require to require to recover transactions 
> but stores all records that need to be committed in Flink state.
> 
> Regarding your questions:
> 
> > - How do you manage regular cleanup / deletion of old data, so the state 
> > does not grow big?
> With the coming version 1.8, Flink will support State TTL to remove state 
> that was not accessed for a certain period of time. You can also clean it up 
> manually, using timers.
> 
> > - When I have e.g. parallelism 3, and thus 3 instances of my .flatMap() 
> > lookup operator, they will not share the state, right? Thus they cannot 
> > access data which was processed by the other instances? This is going to be 
> > a problem.
> That is correct. State is sharded and cannot be accessed by remote tasks.
> 
> > - The flink application state seems volatile, I have to make 100% sure (by 
> > bash scripts and the like) that the application is never stopped/canceled 
> > without making a savepoint, and the restart must resume from the savepoint. 
> > Otherwise it will result in complete data loss worth weeks/months of data. 
> > E.g. if anyone by accident hits the > job “cancel” button in the flink UI, 
> > all data is lost. This seems pretty much like an operational risk, since 
> > this is going to be a 24/7 high availability application.
> > So maybe using only flink state is also not viable.
> You can use externalized checkpoints to ensure that the latest checkpoint is 
> always kept even if the application fails or is explicitly canceled.
> 
> Best,
> Fabian
> 
> Am Mo., 1. Apr. 2019 um 08:56 Uhr schrieb Patrick Fial <patrick.f...@id1.de 
> <mailto:patrick.f...@id1.de>>:
> Hi,
> 
> thanks for your reply and sorry for the late response.
> 
> The problem is, I am unsure how I should implement the two-phase-commit 
> pattern, because my JDBC connection is within a .map()/.flatMap() operator, 
> and it is NOT a data sink. As written in my original question, my stream 
> setup is a simple, one-dimensional pipeline:
> 
> environment.addSource(consumer)     
>   .map(… idempotent transformations ...)
>   .map(new DatabaseFunction)
>   .map(… idempotent transformations ...)
>   .addSink(producer)
> 
> For the state / flink buffering, I am unsure if it would work even with a 
> composite key, since I must be able to do arbitrary database lookups (like 
> select .. from .. where x between y and z order by …). So I am pretty sure I 
> am bound to use a real database connection for the job. 
> So currently, I see no option to use flink states in this situation. Also, as 
> it seems, stream operators currently don’t support the two-phase-commit 
> protocol, I would have to do this with a sink, correct?
> What do you advise to do? Currently I am think about replacing flink with 
> another technology for this part of the application, since this 
> database-lookup logic does not seem to fit the idea of a streaming 
> application in general, thus flink might not be the best choice.
> 
> Another option maybe, lets assume I was able to redesign my application 
> (somehow, maybe I come up with something). Would you advise that I replace 
> the oracle database *entirely* with a flink managed state? 
> 
> This would rise a couple of questions for me:
> - How do you manage regular cleanup / deletion of old data, so the state does 
> not grow big?
> - When I have e.g. parallelism 3, and thus 3 instances of my .flatMap() 
> lookup operator, they will not share the state, right? Thus they cannot 
> access data which was processed by the other instances? This is going to be a 
> problem.
> - The flink application state seems volatile, I have to make 100% sure (by 
> bash scripts and the like) that the application is never stopped/canceled 
> without making a savepoint, and the restart must resume from the savepoint. 
> Otherwise it will result in complete data loss worth weeks/months of data. 
> E.g. if anyone by accident hits the job “cancel” button in the flink UI, all 
> data is lost. This seems pretty much like an operational risk, since this is 
> going to be a 24/7 high availability application.
> So maybe using only flink state is also not viable.
> 
> regards
> Patrick
> 
> --
> Patrick Fial
> Client Platform Entwickler
> Information Design One AG
> 
> Phone +49 69 244 502 38
> Web www.id1.de <http://www.id1.de/>
> 
> 
> Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main 
> Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596
> Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht 
> (Vorsitz)
> 
> Am 21. März 2019 um 11:25:46, Kostas Kloudas (kklou...@gmail.com 
> <mailto:kklou...@gmail.com>) schrieb:
> 
>> Hi Patrick,
>> 
>> In order for you DB records to be up-to-date and correct, I think that you 
>> would have to implement a 2-phase-commit sink.
>> Now for querying multiple keys, why not doing the following:
>> 
>> Let's assume for a single result record, you want to join data from K1, K2, 
>> K3. 
>> You can have a function that creates a composite key `K_comp = 
>> createCompositeKey(K1, K2, K3)`.
>> Then you send 3 records out: (K1, K_comp), (K2, K_comp), (K3, K_comp).
>> You keyBy the first field initially, i,e. K1, K2, K3. This will send the 
>> records to the nodes responsible for each key. 
>> The nodes there will either have the data in state, or they can hit the 
>> Oracle DB to fetch the related data. 
>> So now, Flink will pick the relevant state. 
>> And then you can keyBy the K_comp, which will send again all the records to 
>> the same node, where they can be joined together.
>> 
>> Then you can use your 2-phase JDBC connector to push the result to your 
>> Oracle DB when the checkpoint is acknowledged.
>> This solution uses Flink's state as a buffer.
>> 
>> What do you think about this solution?
>> 
>> Cheers,
>> Kostas
>> 
>> 
>> 
>> On Wed, Mar 20, 2019 at 9:38 AM Patrick Fial <patrick.f...@id1.de 
>> <mailto:patrick.f...@id1.de>> wrote:
>> Hi Andrey,
>> 
>> thanks for your feedback. I am not sure if I understand 100% correctly, but 
>> using the flink state to store my stuff (in addition to the oracle database) 
>> is not an option, because to my knowledge flink state does not allow 
>> arbitrary lookup queries, which I need to do, however. Also, given the logic 
>> described in my original post, the database access is never going to be 
>> idempotent, which lies in the nature of the required insert/update logic.
>> 
>> regards
>> Patrick
>> 
>> --
>> Patrick Fial
>> Client Platform Entwickler
>> Information Design One AG
>> 
>> Phone +49 69 244 502 38
>> Web www.id1.de <http://www.id1.de/>
>> 
>> 
>> Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main 
>> Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596
>> Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht 
>> (Vorsitz)
>> 
>> Am 19. März 2019 um 17:59:22, Andrey Zagrebin (and...@ververica.com 
>> <mailto:and...@ververica.com>) schrieb:
>> 
>>> Hi Patrick,
>>> 
>>> One approach, I would try, is to use Flink state and sync it with database 
>>> in initializeState and CheckpointListener.notifyCheckpointComplete.
>>> Basically issue only idempotent updates to database but only when the last 
>>> checkpoint is securely taken and records before it are not processed again. 
>>> This has though a caveat that database might have stale data between 
>>> checkpoints.
>>> Once the current state is synced with database, depending on your App, it 
>>> might be even cleared from Flink state.
>>> 
>>> I also cc Piotr and Kostas, maybe, they have more ideas.
>>> 
>>> Best,
>>> Andrey
>>> 
>>> On Tue, Mar 19, 2019 at 10:09 AM Patrick Fial <patrick.f...@id1.de 
>>> <mailto:patrick.f...@id1.de>> wrote:
>>> Hello,
>>> 
>>> I am working on a streaming application with apache flink, which shall 
>>> provide end-to-end exactly-once delivery guarantees. The application is 
>>> roughly built like this:
>>> 
>>> environment.addSource(consumer)     
>>>   .map(… idempotent transformations ...)
>>>   .map(new DatabaseFunction)
>>>   .map(… idempotent transformations ...)
>>>   .addSink(producer)
>>> 
>>> Both source and sink are kafka connectors, and thus support exactly-once 
>>> delivery guarantees. 
>>> 
>>> The tricky part comes with the .map() containing the DatabaseFunction. Its 
>>> job is to:
>>> 1) look up the incoming message in some oracle database
>>> 2a) insert it if it is not already stored in the database and publish the 
>>> incoming message
>>> 2b) otherwise combine the incoming update with previous contents from the 
>>> database, and store back the combined update in the database
>>> 3) output the result of 2) to the next operator
>>> 
>>> This logic leads to inconsistent data beeing published to the sink in case 
>>> of a failure where the DatabaseFunction was already executed, but the 
>>> message is not yet published to the sink. 
>>> 
>>> My understanding is, that in such a scenario all operator states would be 
>>> reverted to the last checkpoint. Since the .map() operator is stateless, 
>>> nothing is done here, so only the consumer and producer states are 
>>> reverted. This leads to the message beeing reprocessed from the beginning 
>>> (source), and thus beeing processed *again* by the DatabaseFunction. 
>>> However, the DatabaseFunction is not idempotent (because of 1)-3) as 
>>> explained above), and thus leads to a different output than in the first 
>>> run.
>>> 
>>> The question is, how I can assure transaction-safety in this application? 
>>> 
>>> Basically, I would need to use database transactions within the 
>>> DatabaseFunction, and commit those only if the messages are also commited 
>>> to the kafka sink. However, I don’t know how to achieve this.
>>> 
>>> I read about the two phase commit protocol in flink 
>>> (https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>>>  
>>> <https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html>),
>>>  but I fail to find examples of how to implement this in detail for stream 
>>> operators (NOT sinks). All documentation I find only refers to using the 
>>> two phase commit protocol for sinks. Should I, in this case, only implement 
>>> the CheckpointedFunction and hook on the initializeState/snapshotState to 
>>> rollback/commit by database transactions? Would this already make things 
>>> work? I am a bit confused because there seem to be no hooks for the 
>>> pre-commit/commit/abort signals.
>>> 
>>> Anyway, I am also afraid that this might also introduce scaling issues, 
>>> because depending on the message throughput, committing database actions 
>>> only with every checkpoint interval might blow the temp tablespace in the 
>>> oracle database.
>>> 
>>> Thanks in advance for any help.
>>> 
>>> best regards
>>> Patrick Fial
>>> 
>>> --
>>> Patrick Fial
>>> Client Platform Entwickler
>>> Information Design One AG
>>> 
>>> Phone +49 69 244 502 38
>>> Web www.id1.de <http://www.id1.de/>
>>> 
>>> 
>>> Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main 
>>> Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596
>>> Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht 
>>> (Vorsitz)

Reply via email to