timoninmaxim commented on code in PR #10292:
URL: https://github.com/apache/ignite/pull/10292#discussion_r1004597248
##########
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java:
##########
@@ -231,4 +233,46 @@ public WALIterator replay(
* Start automatically releasing segments when reaching {@link
DataStorageConfiguration#getMaxWalArchiveSize()}.
*/
void startAutoReleaseSegments();
+
+ /**
+ * Archiver can be not created, all files will be written to WAL folder,
using absolute segment index.
+ *
+ * @return flag indicating if archiver is disabled.
+ */
+ boolean isArchiverEnabled();
Review Comment:
just `archiveEnabled()`, as internal API should not use prefixes like "is",
"get" in method names
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java:
##########
@@ -42,6 +45,12 @@
/** Metadata of the full snapshot. */
private final Set<Integer> affectedCacheGrps;
+ /** Current consistent cut WAL pointer. */
+ private final WALPointer lowPtr;
+
+ /** Preivous pointer. */
Review Comment:
Looks like this javadocs are for `lowPtr`, and vice versa.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -1767,6 +1814,11 @@ public IgniteFutureImpl<Void> createSnapshot(String
name, @Nullable String snpPa
}
if (incremental) {
+ if (!cctx.wal().isArchiverEnabled()) {
Review Comment:
Actually, for case `walArchiveDir` equals `walWorkDir` it's possible to
create an incremental snapshot - no segments are cleaned, and there is enough
data for creating a snapshot. But question is whether compression works for
such case? Let's investigate this in separate ticket.
##########
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java:
##########
@@ -231,4 +233,46 @@ public WALIterator replay(
* Start automatically releasing segments when reaching {@link
DataStorageConfiguration#getMaxWalArchiveSize()}.
*/
void startAutoReleaseSegments();
+
+ /**
+ * Archiver can be not created, all files will be written to WAL folder,
using absolute segment index.
+ *
+ * @return flag indicating if archiver is disabled.
Review Comment:
s/disabled/enabled/
##########
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java:
##########
@@ -231,4 +233,46 @@ public WALIterator replay(
* Start automatically releasing segments when reaching {@link
DataStorageConfiguration#getMaxWalArchiveSize()}.
*/
void startAutoReleaseSegments();
+
+ /**
+ * Archiver can be not created, all files will be written to WAL folder,
using absolute segment index.
+ *
+ * @return flag indicating if archiver is disabled.
+ */
+ boolean isArchiverEnabled();
+
+ /**
+ * Archive directory if any.
+ *
+ * @return Arvhice directory.
Review Comment:
missprint in "Arvhice"
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java:
##########
@@ -79,10 +79,31 @@ synchronized void onSegmentCompressed(long compressedIdx) {
else
this.lastCompressedIdx = lastMaxCompressedIdx;
+ notifyAll();
+
if (compressedIdx > lastEnqueuedToCompressIdx)
lastEnqueuedToCompressIdx = compressedIdx;
}
+ /**
+ * Method will wait activation of particular WAL segment index.
+ *
+ * @param awaitIdx absolute index {@link #lastCompressedIdx()}} to become
true.
+ * @throws IgniteInterruptedCheckedException if interrupted.
+ */
+ public synchronized void awaitSegmentCompressed(long awaitIdx) throws
IgniteInterruptedCheckedException {
+ while (lastCompressedIdx() < awaitIdx && !interrupted) {
+ try {
+ wait(2000);
Review Comment:
Is it possible to avoid blocking here? I suggest that this method should
return a `future`, and calling thread just assign a listener on this future.
This is especially important, because this blocking thread is part of the
snapshotExecutorService pool, that is also used by
`SnapshotPartitionsVerifyHandler`, it checks snapshot consistency. Then running
snapshot operation might block independent task of checking existing snapshot.
I suppose at least we must create a ticket for that. WDYT?
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java:
##########
@@ -102,7 +115,62 @@ public IncrementalSnapshotFutureTask(
return false;
}
- onDone(new IncrementalSnapshotFutureTaskResult());
+
cctx.kernalContext().pools().getSnapshotExecutorService().submit(() -> {
+ try {
+ // First increment must include low segment, because full
snapshot knows nothing about WAL.
+ // All other begins from the next segment because lowPtr
already saved inside previous increment.
+ long lowIdx = lowPtr.index() + (incIdx == 1 ? 0 : 1);
+ long highIdx = highPtr.index();
+
+ boolean compactionEnabled =
+
cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled();
+
+ if (log.isInfoEnabled()) {
+ log.info("Waiting for WAL segments archivation
[lowIdx=" + lowIdx +
+ ", highIdx=" + highIdx +
+ ", compaction=" + compactionEnabled + ']');
+ }
+
+ assert lowIdx <= highIdx;
+
+ cctx.wal().awaitArchived(highPtr.index());
+
+ if (compactionEnabled) {
Review Comment:
Let's require user to enable compaction in config for having opportunity to
create an incremental snapshot.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java:
##########
@@ -102,7 +115,62 @@ public IncrementalSnapshotFutureTask(
return false;
}
- onDone(new IncrementalSnapshotFutureTaskResult());
+
cctx.kernalContext().pools().getSnapshotExecutorService().submit(() -> {
+ try {
+ // First increment must include low segment, because full
snapshot knows nothing about WAL.
+ // All other begins from the next segment because lowPtr
already saved inside previous increment.
+ long lowIdx = lowPtr.index() + (incIdx == 1 ? 0 : 1);
+ long highIdx = highPtr.index();
+
+ boolean compactionEnabled =
+
cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled();
+
+ if (log.isInfoEnabled()) {
+ log.info("Waiting for WAL segments archivation
[lowIdx=" + lowIdx +
+ ", highIdx=" + highIdx +
+ ", compaction=" + compactionEnabled + ']');
+ }
+
+ assert lowIdx <= highIdx;
+
+ cctx.wal().awaitArchived(highPtr.index());
+
+ if (compactionEnabled) {
+ if (log.isInfoEnabled()) {
+ log.info("Waiting for WAL segments compression
[lowIdx=" + lowIdx +
+ ", highIdx=" + highIdx + ']');
+ }
+
+ cctx.wal().awaitCompressed(highPtr.index());
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Linking WAL segments into incremental
snapshot [lowIdx=" + lowIdx +
+ ", highIdx=" + highIdx +
+ ", compaction=" + compactionEnabled + ']');
+ }
+
+ for (; lowIdx <= highIdx; lowIdx++) {
+ File seg = compactionEnabled
+ ? cctx.wal().compressedSegment(lowIdx)
+ : cctx.wal().archiveSegment(lowIdx);
+
+ assert seg.exists();
Review Comment:
There should be an IgniteCheckedException instead of assertion. Segment
could be cleaned by Ignite. Let's also add a test for that.
##########
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java:
##########
@@ -231,4 +233,46 @@ public WALIterator replay(
* Start automatically releasing segments when reaching {@link
DataStorageConfiguration#getMaxWalArchiveSize()}.
*/
void startAutoReleaseSegments();
+
+ /**
+ * Archiver can be not created, all files will be written to WAL folder,
using absolute segment index.
+ *
+ * @return flag indicating if archiver is disabled.
+ */
+ boolean isArchiverEnabled();
+
+ /**
+ * Archive directory if any.
+ *
+ * @return Arvhice directory.
+ */
+ @Nullable File archiveDir();
+
+ /**
+ * @param idx Segment index.
+ * @return Path to compressed archive segment.
+ */
+ @Nullable File archiveSegment(long idx);
+
+ /**
+ * @param idx Segment index.
+ * @return Path to compressed archive segment.
Review Comment:
Not path, but file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]