tkhurana commented on code in PR #2423:
URL: https://github.com/apache/phoenix/pull/2423#discussion_r3163606874
##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java:
##########
@@ -283,19 +295,54 @@ protected boolean shouldProcessInProgressDirectory() {
/**
* Processes all new files for a specific replication round. Continuously
processes files until no
- * new files remain for the round.
+ * new files remain for the round. Files that are younger than the
configured minimum age
+ * (roundTime + buffer percentage) are skipped to avoid processing files
still being written by
+ * the source cluster. When all remaining files are too young, sleeps for
the minimum time needed
+ * until the oldest young file becomes eligible.
* @param replicationRound - The replication round for which to process new
files
* @throws IOException if there's an error during file processing
*/
protected void processNewFilesForRound(ReplicationRound replicationRound)
throws IOException {
LOG.info("Starting new files processing for round: {} for haGroup: {}",
replicationRound,
haGroupName);
long startTime = EnvironmentEdgeManager.currentTime();
- List<Path> files =
replicationLogTracker.getNewFilesForRound(replicationRound);
- LOG.info("Number of new files for round {} is {}", replicationRound,
files.size());
- while (!files.isEmpty()) {
- processOneRandomFile(files);
- files = replicationLogTracker.getNewFilesForRound(replicationRound);
+ List<Path> allFiles =
replicationLogTracker.getNewFilesForRound(replicationRound);
+ LOG.info("Number of new files for round {} is {}", replicationRound,
allFiles.size());
+ long minAgeMs = getNewFileMinAgeMs();
Review Comment:
This won't be changing right from one round to another right. We can avoid
computing it everytime
##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java:
##########
@@ -283,19 +295,54 @@ protected boolean shouldProcessInProgressDirectory() {
/**
* Processes all new files for a specific replication round. Continuously
processes files until no
- * new files remain for the round.
+ * new files remain for the round. Files that are younger than the
configured minimum age
+ * (roundTime + buffer percentage) are skipped to avoid processing files
still being written by
+ * the source cluster. When all remaining files are too young, sleeps for
the minimum time needed
+ * until the oldest young file becomes eligible.
* @param replicationRound - The replication round for which to process new
files
* @throws IOException if there's an error during file processing
*/
protected void processNewFilesForRound(ReplicationRound replicationRound)
throws IOException {
LOG.info("Starting new files processing for round: {} for haGroup: {}",
replicationRound,
haGroupName);
long startTime = EnvironmentEdgeManager.currentTime();
- List<Path> files =
replicationLogTracker.getNewFilesForRound(replicationRound);
- LOG.info("Number of new files for round {} is {}", replicationRound,
files.size());
- while (!files.isEmpty()) {
- processOneRandomFile(files);
- files = replicationLogTracker.getNewFilesForRound(replicationRound);
+ List<Path> allFiles =
replicationLogTracker.getNewFilesForRound(replicationRound);
+ LOG.info("Number of new files for round {} is {}", replicationRound,
allFiles.size());
+ long minAgeMs = getNewFileMinAgeMs();
+ while (!allFiles.isEmpty()) {
Review Comment:
I highly recommend adding isRunning check here.
```suggestion
while (!allFiles.isEmpty() && isRunning) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]