yujun777 commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3361319946


##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java:
##########
@@ -169,21 +186,16 @@ public boolean hasData(Partition partition) {
         // if all available visible data has been consumed, return false
         // todo(TsukiokaKogane): change offset from partition version to 
commit tso
         return  (!partitionOffset.containsKey(partition.getId())
-                || 
!partitionOffset.get(partition.getId()).equals(partition.getVisibleVersion()))
+                || 
!partitionOffset.get(partition.getId()).equals(partition.getTso()))
                 && partition.hasData();
     }
 
+    public boolean hasHistoricalData(long partitionId) {
+        return historicalPartitionOffset.containsKey(partitionId);
+    }
+
     public Pair<Long, Long> getStreamUpdate(Long partitionId) {
-        Long next = null;
-        Long prev = null;
-        if (historicalPartitionOffset.containsKey(partitionId)) {
-            next = historicalPartitionOffset.get(partitionId);
-        } else {
-            // todo(TsukiokaKogane): update next version with stepping
-            next = ((OlapTable) 
baseTable).getPartition(partitionId).getVisibleVersion();
-        }
-        prev = partitionOffset.get(partitionId);
-        return Pair.of(prev, next);
+        return Pair.of(partitionOffset.get(partitionId), 
historicalPartitionOffset.get(partitionId));

Review Comment:
   For incremental stream partitions this returns `next = null`, so 
`OlapScanNode.addScanRangeLocations()` only sends `start_tso` and does not send 
an `end_tso`. But `OlapScanNode.getStreamUpdate()` later records `next` as the 
partition current TSO. Those two values are not frozen together: the BE can 
scan an open-ended range while the transaction commits a FE-side `next` offset 
captured at a different moment.
   
   For stream offset correctness, the scan range and the committed update 
should use the same `[prev, next]` snapshot. I think incremental stream scan 
should also capture the current partition TSO here as `next`, and the scan 
range should send it as `end_tso` before the insert transaction records that 
same value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to