voonhous commented on code in PR #6856:
URL: https://github.com/apache/hudi/pull/6856#discussion_r990894756


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -279,8 +279,9 @@ private FlinkOptions() {
       .defaultValue(false)// default read as batch
       .withDescription("Whether to skip compaction instants for streaming 
read,\n"
           + "there are two cases that this option can be used to avoid reading 
duplicates:\n"
-          + "1) you are definitely sure that the consumer reads faster than 
any compaction instants, "
-          + "usually with delta time compaction strategy that is long enough, 
for e.g, one week;\n"
+          + "1) `hoodie.compaction.preserve.commit.metadata` is set to `false` 
and you are definitely sure that the "
+          + "consumer reads faster than any compaction instants, usually with 
delta time compaction strategy that is "

Review Comment:
   Understand that `hoodie.compaction.preserve.commit.metadata` is `true` by 
default. 
   
   The default values for the configuration keys of interest are as such:
   ```properties
   hoodie.compaction.preserve.commit.metadata=true
   read.streaming.skip_compaction=false
   ```
   
   The phrasing of the `read.streaming.skip_compaction` configuration's 
description is VERY CONFUSING. 
   
   As of now, the description is as such:
   
   ```txt
   Whether to skip compaction instants for streaming read, there are two cases 
that this option can be used to avoid reading duplicates:
   
   1) you are definitely sure that the consumer reads faster than any 
compaction instants, usually with delta time compaction strategy that is long 
enough, for e.g, one week;
   
   2) changelog mode is enabled, this option is a solution to keep data 
integrity
   ```
   
   What I understand from this is:
   You can enable (set the configuration value to true) 
`read.streaming.skip_compaction` to prevent reading of duplicates under these 2 
cases:
   
   1) you are definitely sure that the consumer reads faster than any 
compaction instants, usually with delta time compaction strategy that is long 
enough, for e.g, one week;
   
   # Consumer reads FASTER than compaction instants
   Consumer reads faster than any compaction instants would mean that 
compaction is slower than consumer read. 
   
   As such, compaction will complete after reading. A concrete example is shown 
below. I am not sure if I am understanding is correct. Please correct me for 
any conceptual mistakes made.
   
   ## Read iteration 1
   Say the commit timeline looks like this at read iteration 1:
   1. compaction plan at `instant-0` is created but still ongoing
   2. deltacommit at `instant-1` has completed
   
   The read consumer will only read out newly inserted rows in deltacommit @ 
`instant-1`.
   
   Issued instant is updated as such:
   ```txt
   issuedInstant=1
   ```
   
   **Timeline when performing read iteration 1:**
   
   ```
   0.compaction.requested
   0.compaction.inflight
   
   1.deltacommit.requested
   1.deltacommit.inflight
   1.deltacommit
   ```
   
   ### Read iteration 2
   on read iteration 2:
   1. compaction plan at `instant-0` has completed
   2. deltacommit at `instant-2` has completed
   
   Since the `issuedInstant=1`, InstantRange is as such:
   ```txt
   InstantRange[type=OPEN_CLOSE, start=1, end=2]
   ```
   
   The read consumer will read out newly inserted rows in deltacommit @ 
`instant-2`, at this point, the rows generated by the base parquet files at 
compaction `instant-0` will be ignored since the data file iterators will skip 
rows that have an `_hoodie_commit_instant` that lies outside the instantRange.
   
   TBH, I am not sure if the parquet files will be fetched. Regardless, even if 
they are fetched, the data file iterators will skip over the rows since they 
will NEVER fall within instantRange of the current read iteration.
   
   **Timeline when performing read iteration 2:**
   
   ```
   0.compaction.requested
   0.compaction.inlifght
   0.commit
   
   1.deltacommit.requested
   1.deltacommit.inflight
   1.deltacommit
   
   2.deltacommit.requested
   2.deltacommit.inflight
   2.deltacommit
   ```
   
   ## Configuration description
   Circling back to the configuration description, when a consumer is reading 
faster than a compaction instant, the possibility of duplicated data being read 
out (due to them existing in the base parquet file and delta log file) is 
virtually 0. 
   
   `read.streaming.skip_compaction` SHOULD NOT be used to avoid duplicates if 
**consumer reads faster than any compaction instants**. Hence, explaining why i 
feel the original description is misleading.
   
   ## Proposed change
   That being said, my phrasing in the initial change is pretty misleading too.
   
   What I was trying to say is:
   
   The `read.streaming.skip_compaction` should only be enabled if 
`hoodie.compaction.preserve.commit.metadata` is modified to its non-default 
value of false IFF compaction plan completes before a deltacommit to be read in 
the next read iteration.
   
   Building upon the previous examples, suppose a read iteration 3 is performed 
with the following configurations:
   ```properties
   hoodie.compaction.preserve.commit.metadata=false (non-default value)
   read.streaming.skip_compaction=false (default value)
   ```
   
   **Timeline when performing read iteration 3:** 
   
   `instant-3` and `instant-4` has completed and will be read in during read 
iteration 3.
   
   ```txt
   3.compaction.requested
   3.compaction.inflight
   3.commit
   
   4.deltacommit.requested
   4.deltacommit.inflight
   4.deltacommit
   
   issuedInstant=2
   InstantRange[type=OPEN_CLOSE, start=2, end=4]
   ```
   
   At this point, the newly compacted rows (which have already been read during 
read iteration 2) in the base-parquet files generated in `instant-3` will have 
a `_hoodie_commit_time` of `3`. (commit metadata is not preserved, hence 
overwritten)
   
   This falls within the InstantRange in the current read iteration, causing 
the records that have been read in read iteration 2 to be read out again, 
causing duplicated data to be read out.
   
   As such, `read.streaming.skip_compaction` should be used to avoid reading 
duplicates under such a case when the user is definitely sure that the 
compaction instants are completing faster than / before the next deltacommit to 
be read in. 
   
   # Disclaimer
   I am ignoring changelog mode's description as I have yet to test this 
configuration under such a use case.
   
   We will also need to sync up the changes here with #6855 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to