Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1305#issuecomment-153289042
  
    Cool stuff, really! This is very much in line with what I had in mind for a 
SQL backend.
    
    Let me check if I understood everything correct (and see where my 
understanding is wrong), because I think we should be able to make an "exactly 
once" version of this based that mechanism. I am basically rephrasing what you 
describe in a different model.
    
    ### Basic Mode
    
    What this is effectively doing is a batched and asynchronous version of 
distributed 2-phase commit transactions. The phases look basically like this:
    
      - **Adding data**: Pipe all modifications into the database, but not 
commit the transaction. They are tagged with the timestamp of the upcoming 
checkpoint (or any coordinated increasing version counter). This can happen in 
the background thread, for as long as the in-operator cache holds all edits 
that are not in the database yet.
    
      - **Pre-commit**: This is when the checkpoint is triggered. All pending 
edits are written into the database and then the transaction is committed. The 
state handle only includes the timestamp used on the elements. In the classical 
2-phase transactions, after a task acks the pre-commit, it has to be able to 
recover to that state, which is given here. The checkpoint is not immediately 
valid for recovery though, which means that recovery has to have either a 
filter, or issue a query that deletes all records with timestamps larger than 
the version given during recovery. After the pre-commit, the timestamp is 
locally incremented and work can continue.
    
      - **Full commit**: This happens implicitly when the checkpoint 
coordinator marks the checkpoint as complete.
    
      - **Recovery**: The timestamp (or version counter) of the last successful 
checkpoint is restored, the deletion of records that were committed (but where 
the checkpoint did not succeed as a whole) happens, then records are lazily 
fetched. 
    
    So far, this should give exactly once guarantees, or am I overlooking 
something?
    
    ### Compacting
    
    Whenever the "checkpoint complete" notification comes (or every so many 
changes) you trigger a clean-up query in the background. Given that the SQL 
database has a not completely terrible query planner, this SQL statement would 
be okay efficient (single semi join).
    ```
    DELETE FROM "table name" t1
    WHERE EXISTS 
      (SELECT *
         FROM "table name" t2
        WHERE t2.handle_id = t1.handle_id
          AND t2.timestamp > t1.timestamp    //-- a newer version exists for 
the same handle
          AND t2.timestamp <= GLOBAL_VERSION //-- and the newer version is 
globally committed
      )
    ```
    The good thing is that by virtue of having the incrementing global 
versions, we can set the isolation level for the query to "read uncommitted", 
which means that it will not lock anything and thus not compete with any other 
ongoing modifications.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to