NSAmelchev commented on code in PR #10766:
URL: https://github.com/apache/ignite/pull/10766#discussion_r1256004639
##########
modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java:
##########
@@ -102,69 +115,138 @@ protected
CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool
try {
lock.tryLock(1);
- try (Stream<Path> cdcFiles = Files.list(walCdcDir.toPath())) {
- Set<File> delete = new HashSet<>();
+ Long lostSgmnt = findLastLostSegement();
- AtomicLong lastSgmnt = new AtomicLong(-1);
+ if (lostSgmnt != null)
+ deleteAll(lostSgmnt);
- cdcFiles
- .filter(p ->
WAL_SEGMENT_FILE_FILTER.accept(p.toFile()))
-
.sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex)
- .reversed()) // Sort by segment index.
- .forEach(path -> {
- long idx =
FileWriteAheadLogManager.segmentIndex(path);
+ Long cdcDisableSgmnt = findLastCdcDisableSegment();
- if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx
== 1) {
- lastSgmnt.set(idx);
+ if (cdcDisableSgmnt != null)
+ deleteAll(cdcDisableSgmnt);
- return;
- }
+ if (lostSgmnt != null || cdcDisableSgmnt != null)
+ resetWalState();
+ else {
Review Comment:
Fixed
##########
modules/core/src/main/java/org/apache/ignite/internal/management/cdc/CdcDeleteLostSegmentsTask.java:
##########
@@ -102,69 +115,138 @@ protected
CdcDeleteLostSegmentsJob(CdcDeleteLostSegmentLinksCommandArg arg, bool
try {
lock.tryLock(1);
- try (Stream<Path> cdcFiles = Files.list(walCdcDir.toPath())) {
- Set<File> delete = new HashSet<>();
+ Long lostSgmnt = findLastLostSegement();
- AtomicLong lastSgmnt = new AtomicLong(-1);
+ if (lostSgmnt != null)
+ deleteAll(lostSgmnt);
- cdcFiles
- .filter(p ->
WAL_SEGMENT_FILE_FILTER.accept(p.toFile()))
-
.sorted(Comparator.comparingLong(FileWriteAheadLogManager::segmentIndex)
- .reversed()) // Sort by segment index.
- .forEach(path -> {
- long idx =
FileWriteAheadLogManager.segmentIndex(path);
+ Long cdcDisableSgmnt = findLastCdcDisableSegment();
- if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx
== 1) {
- lastSgmnt.set(idx);
+ if (cdcDisableSgmnt != null)
+ deleteAll(cdcDisableSgmnt);
- return;
- }
+ if (lostSgmnt != null || cdcDisableSgmnt != null)
+ resetWalState();
+ else {
+ if (log.isInfoEnabled())
+ log.info("Lost segment CDC links or CDC disable record
were not found.");
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to delete lost segment CDC
links. " +
+ "Unable to acquire lock to lock CDC folder. Make sure a
CDC app is shut down " +
+ "[dir=" + walCdcDir.getAbsolutePath() + ", reason=" +
e.getMessage() + ']');
+ }
+ finally {
+ U.closeQuiet(lock);
+ }
+
+ return null;
+ }
- delete.add(path.toFile());
- });
+ /** @return The index of the segment previous to the last gap or
{@code null} if no gaps were found. */
+ private Long findLastLostSegement() {
+ AtomicReference<Long> lastLostSgmnt = new AtomicReference<>();
+ AtomicLong lastSgmnt = new AtomicLong(-1);
- if (delete.isEmpty()) {
- log.info("Lost segment CDC links were not found.");
+ consumeCdcSegments(segment -> {
+ if (lastLostSgmnt.get() != null)
+ return;
- return null;
- }
+ long idx = FileWriteAheadLogManager.segmentIndex(segment);
- log.info("Found lost segment CDC links. The following
links will be deleted: " + delete);
+ if (lastSgmnt.get() == -1 || lastSgmnt.get() - idx == 1) {
+ lastSgmnt.set(idx);
+
+ return;
+ }
+
+ if (log.isInfoEnabled())
+ log.info("Found lost segment CDC links [lastLostSgmntIdx="
+ idx + ']');
+
+ lastLostSgmnt.set(idx);
+ });
+
+ return lastLostSgmnt.get();
+ }
- delete.forEach(file -> {
- if (!file.delete()) {
- throw new IgniteException("Failed to delete lost
segment CDC link [file=" +
- file.getAbsolutePath() + ']');
- }
+ /** @return The index of the segment that contains the last {@link
CdcDisableRecord}. */
+ private Long findLastCdcDisableSegment() {
+ AtomicReference<Long> lastRec = new AtomicReference<>();
- log.info("Segment CDC link deleted [file=" +
file.getAbsolutePath() + ']');
- });
+ consumeCdcSegments(segment -> {
+ if (lastRec.get() != null)
+ return;
- Path stateDir = walCdcDir.toPath().resolve(STATE_DIR);
+ if (log.isInfoEnabled())
+ log.info("Processing CDC segment [segment=" + segment +
']');
- if (stateDir.toFile().exists()) {
- File walState =
stateDir.resolve(WAL_STATE_FILE_NAME).toFile();
+ IgniteWalIteratorFactory.IteratorParametersBuilder builder =
+ new IgniteWalIteratorFactory.IteratorParametersBuilder()
+ .log(log)
+ .sharedContext(ignite.context().cache().context())
+ .filesOrDirs(segment.toFile())
+ .addFilter((type, ptr) -> type == CDC_DISABLE);
- if (walState.exists() && !walState.delete()) {
- throw new IgniteException("Failed to delete wal
state file [file=" +
- walState.getAbsolutePath() + ']');
- }
+ if
(ignite.configuration().getDataStorageConfiguration().getPageSize() != 0)
+
builder.pageSize(ignite.configuration().getDataStorageConfiguration().getPageSize());
+
+ try (WALIterator it = new
IgniteWalIteratorFactory(log).iterator(builder)) {
+ if (it.hasNext()) {
+ if (log.isInfoEnabled())
+ log.info("Found CDC disable record [ptr=" +
it.next().get1() + ']');
+
+
lastRec.set(FileWriteAheadLogManager.segmentIndex(segment));
}
}
- catch (IOException e) {
- throw new RuntimeException("Failed to delete lost segment
CDC links.", e);
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to read CDC segment
[path=" + segment + ']', e);
}
+ });
+
+ return lastRec.get();
+ }
+
+ /** */
+ private void resetWalState() {
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]