Repository: ignite
Updated Branches:
  refs/heads/ignite-2.5 4f45f59ed -> 02420518e


IGNITE-8429 Unexpected error during incorrect WAL segment decompression, causes 
node termination

Signed-off-by: Andrey Gura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02420518
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02420518
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02420518

Branch: refs/heads/ignite-2.5
Commit: 02420518ebf2adce7a97d331980b2ac17bb99182
Parents: 4f45f59
Author: Ivan Rakov <[email protected]>
Authored: Tue May 8 15:03:00 2018 +0300
Committer: Andrey Gura <[email protected]>
Committed: Tue May 8 15:18:07 2018 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionSupplier.java |  4 +--
 .../wal/FileWriteAheadLogManager.java           | 32 ++++++++++++--------
 .../wal/FsyncModeFileWriteAheadLogManager.java  | 32 ++++++++++++--------
 3 files changed, 40 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/02420518/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 84e6828..4946d7e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -32,10 +32,10 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.S;

http://git-wip-us.apache.org/repos/asf/ignite/blob/02420518/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 7795344..6ac102f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -33,6 +33,7 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.sql.Time;
 import java.util.ArrayList;
@@ -2065,11 +2066,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         /** {@inheritDoc} */
         @Override public void run() {
-            Throwable err = null;
-
             while (!Thread.currentThread().isInterrupted() && !stopped) {
+                long segmentToDecompress = -1L;
+
                 try {
-                    long segmentToDecompress = segmentsQueue.take();
+                    segmentToDecompress = segmentsQueue.take();
 
                     if (stopped)
                         break;
@@ -2087,7 +2088,16 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                             io.write(arr, 0, bytesRead);
                     }
 
-                    Files.move(unzipTmp.toPath(), unzip.toPath());
+                    try {
+                        Files.move(unzipTmp.toPath(), unzip.toPath());
+                    }
+                    catch (FileAlreadyExistsException e) {
+                        U.error(log, "Can't rename temporary unzipped segment: 
raw segment is already present " +
+                            "[tmp=" + unzipTmp + ", raw=" + unzip + "]", e);
+
+                        if (!unzipTmp.delete())
+                            U.error(log, "Can't delete temporary unzipped 
segment [tmp=" + unzipTmp + "]");
+                    }
 
                     synchronized (this) {
                         
decompressionFutures.remove(segmentToDecompress).onDone();
@@ -2097,16 +2107,12 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     Thread.currentThread().interrupt();
                 }
                 catch (Throwable t) {
-                    err = t;
-                }
-                finally {
-                    if (err == null && !stopped)
-                        err = new IllegalStateException("Thread " + getName() 
+ " is terminated unexpectedly");
+                    if (!stopped && segmentToDecompress != -1L) {
+                        IgniteCheckedException e = new 
IgniteCheckedException("Error during WAL segment " +
+                            "decompression [segmentIdx=" + segmentToDecompress 
+ "]", t);
 
-                    if (err instanceof OutOfMemoryError)
-                        cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
-                    else if (err != null)
-                        cctx.kernalContext().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
+                        
decompressionFutures.remove(segmentToDecompress).onDone(e);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/02420518/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
index dfb1c41..cf643fd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -28,6 +28,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.sql.Time;
 import java.util.ArrayList;
@@ -1873,11 +1874,11 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
 
         /** {@inheritDoc} */
         @Override public void run() {
-            Throwable err = null;
-
             while (!Thread.currentThread().isInterrupted() && !stopped) {
+                long segmentToDecompress = -1L;
+
                 try {
-                    long segmentToDecompress = segmentsQueue.take();
+                    segmentToDecompress = segmentsQueue.take();
 
                     if (stopped)
                         break;
@@ -1895,7 +1896,16 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                             io.write(arr, 0, bytesRead);
                     }
 
-                    Files.move(unzipTmp.toPath(), unzip.toPath());
+                    try {
+                        Files.move(unzipTmp.toPath(), unzip.toPath());
+                    }
+                    catch (FileAlreadyExistsException e) {
+                        U.error(log, "Can't rename temporary unzipped segment: 
raw segment is already present " +
+                            "[tmp=" + unzipTmp + ", raw=" + unzip + ']', e);
+
+                        if (!unzipTmp.delete())
+                            U.error(log, "Can't delete temporary unzipped 
segment [tmp=" + unzipTmp + ']');
+                    }
 
                     synchronized (this) {
                         
decompressionFutures.remove(segmentToDecompress).onDone();
@@ -1905,16 +1915,12 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                     Thread.currentThread().interrupt();
                 }
                 catch (Throwable t) {
-                    err = t;
-                }
-                finally {
-                    if (err == null && !stopped)
-                        err = new IllegalStateException("Thread " + getName() 
+ " is terminated unexpectedly");
+                    if (!stopped && segmentToDecompress != -1L) {
+                        IgniteCheckedException e = new 
IgniteCheckedException("Error during WAL segment " +
+                            "decompression [segmentIdx=" + segmentToDecompress 
+ ']', t);
 
-                    if (err instanceof OutOfMemoryError)
-                        cctx.kernalContext().failure().process(new 
FailureContext(CRITICAL_ERROR, err));
-                    else if (err != null)
-                        cctx.kernalContext().failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, err));
+                        
decompressionFutures.remove(segmentToDecompress).onDone(e);
+                    }
                 }
             }
         }

Reply via email to