GG-12418 - WAL hangs on any error during segment rollover
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17d881ba Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17d881ba Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17d881ba Branch: refs/heads/master Commit: 17d881ba0122a7f90cac9846c376300a1d001bdd Parents: f1c8e59 Author: Pavel Kovalenko <jokse...@gmail.com> Authored: Mon Jul 10 13:55:47 2017 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Jul 10 13:57:51 2017 +0300 ---------------------------------------------------------------------- .../PersistentStoreConfiguration.java | 24 +++ .../cache/persistence/file/FileIO.java | 154 +++++++++++++++ .../cache/persistence/file/FileIODecorator.java | 98 ++++++++++ .../cache/persistence/file/FileIOFactory.java | 45 +++++ .../cache/persistence/file/FilePageStore.java | 51 +++-- .../persistence/file/FilePageStoreManager.java | 2 + .../persistence/file/RandomAccessFileIO.java | 110 +++++++++++ .../file/RandomAccessFileIOFactory.java | 42 ++++ .../wal/AbstractWalRecordsIterator.java | 22 ++- .../cache/persistence/wal/FileInput.java | 40 ++-- .../wal/FileWriteAheadLogManager.java | 161 ++++++++------- .../wal/reader/IgniteWalIteratorFactory.java | 13 +- .../wal/reader/StandaloneGridKernalContext.java | 15 +- .../reader/StandaloneIgnitePluginProcessor.java | 38 ++++ .../reader/StandaloneWalRecordsIterator.java | 37 ++-- ...gnitePdsRecoveryAfterFileCorruptionTest.java | 11 +- .../db/wal/IgniteWalFlushFailoverTest.java | 195 +++++++++++++++++++ .../db/wal/crc/IgniteDataIntegrityTests.java | 10 +- .../db/wal/reader/IgniteWalReaderTest.java | 9 +- .../db/wal/reader/MockWalIteratorFactory.java | 8 +- .../ignite/testsuites/IgnitePdsTestSuite2.java | 4 + 21 files changed, 919 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java index b531f9d..4792483 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java @@ -16,6 +16,8 @@ */ package org.apache.ignite.configuration; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.util.typedef.internal.S; import java.io.Serializable; @@ -133,6 +135,9 @@ public class PersistentStoreConfiguration implements Serializable { /** Always write full pages. */ private boolean alwaysWriteFullPages = DFLT_WAL_ALWAYS_WRITE_FULL_PAGES; + /** Factory to provide I/O interface for files */ + private FileIOFactory fileIOFactory = new RandomAccessFileIOFactory(); + /** * Number of sub-intervals the whole {@link #setRateTimeInterval(long)} will be split into to calculate * rate-based metrics. @@ -539,6 +544,25 @@ public class PersistentStoreConfiguration implements Serializable { } /** + * Factory to provide implementation of FileIO interface + * which is used for any file read/write operations + * + * @return File I/O factory + */ + public FileIOFactory getFileIOFactory() { + return fileIOFactory; + } + + /** + * @param fileIOFactory File I/O factory + */ + public PersistentStoreConfiguration setFileIOFactory(FileIOFactory fileIOFactory) { + this.fileIOFactory = fileIOFactory; + + return this; + } + + /** * <b>Note:</b> setting this value with {@link WALMode#DEFAULT} may generate file size overhead for WAL segments in case * grid is used rarely. * http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java new file mode 100644 index 0000000..1e81150 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Interface to perform file I/O operations. + */ +public interface FileIO extends AutoCloseable { + /** + * Returns current file position. + * + * @return Current file position, + * a non-negative integer counting the number of bytes + * from the beginning of the file to the current position. + * + * @throws IOException If some I/O error occurs. + */ + public long position() throws IOException; + + /** + * Sets new current file position. + * + * @param newPosition + * The new position, a non-negative integer counting + * the number of bytes from the beginning of the file. + * + * @throws IOException If some I/O error occurs. + */ + public void position(long newPosition) throws IOException; + + /** + * Reads a sequence of bytes from this file into the {@code destinationBuffer}. + * + * @param destinationBuffer Destination byte buffer. + * + * @return Number of read bytes. + * + * @throws IOException If some I/O error occurs. + */ + public int read(ByteBuffer destinationBuffer) throws IOException; + + /** + * Reads a sequence of bytes from this file into the {@code destinationBuffer} + * starting from specified file {@code position}. + * + * @param destinationBuffer Destination byte buffer. + * @param position Starting position of file. + * + * @return Number of read bytes. + * + * @throws IOException If some I/O error occurs. + */ + public int read(ByteBuffer destinationBuffer, long position) throws IOException; + + /** + * Reads a up to {@code length} bytes from this file into the {@code buffer}. + * + * @param buffer Destination byte array. + * @param offset The start offset in array {@code b} + * at which the data is written. + * @param length Maximum number of bytes read. + * + * @return Number of read bytes. + * + * @throws IOException If some I/O error occurs. + */ + public int read(byte[] buffer, int offset, int length) throws IOException; + + /** + * Writes a sequence of bytes to this file from the {@code sourceBuffer}. + * + * @param sourceBuffer Source buffer. + * + * @return Number of written bytes. + * + * @throws IOException If some I/O error occurs. + */ + public int write(ByteBuffer sourceBuffer) throws IOException; + + /** + * Writes a sequence of bytes to this file from the {@code sourceBuffer} + * starting from specified file {@code position} + * + * @param sourceBuffer Source buffer. + * @param position Starting file position. + * + * @return Number of written bytes. + * + * @throws IOException If some I/O error occurs. + */ + public int write(ByteBuffer sourceBuffer, long position) throws IOException; + + /** + * Writes {@code length} bytes from the {@code buffer} + * starting at offset {@code off} to this file. + * + * @param buffer Source byte array. + * @param offset Start offset in the {@code buffer}. + * @param length Number of bytes to write. + * + * @throws IOException If some I/O error occurs. + */ + public void write(byte[] buffer, int offset, int length) throws IOException; + + /** + * Forces any updates of this file to be written to the storage + * device that contains it. + * + * @throws IOException If some I/O error occurs. + */ + public void force() throws IOException; + + /** + * Returns current file size in bytes. + * + * @return File size. + * + * @throws IOException If some I/O error occurs. + */ + public long size() throws IOException; + + /** + * Truncates current file to zero length + * and resets current file position to zero. + * + * @throws IOException If some I/O error occurs. + */ + public void clear() throws IOException; + + /** + * Closes current file. + * + * @throws IOException If some I/O error occurs. + */ + @Override public void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java new file mode 100644 index 0000000..3e80ef8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Decorator class for File I/O + */ +public class FileIODecorator implements FileIO { + + /** File I/O delegate */ + private final FileIO delegate; + + /** + * + * @param delegate File I/O delegate + */ + public FileIODecorator(FileIO delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public long position() throws IOException { + return delegate.position(); + } + + /** {@inheritDoc} */ + @Override public void position(long newPosition) throws IOException { + delegate.position(newPosition); + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destinationBuffer) throws IOException { + return delegate.read(destinationBuffer); + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException { + return delegate.read(destinationBuffer, position); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] buffer, int offset, int length) throws IOException { + return delegate.read(buffer, offset, length); + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer sourceBuffer) throws IOException { + return delegate.write(sourceBuffer); + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException { + return delegate.write(sourceBuffer, position); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] buffer, int offset, int length) throws IOException { + delegate.write(buffer, offset, length); + } + + /** {@inheritDoc} */ + @Override public void force() throws IOException { + delegate.force(); + } + + /** {@inheritDoc} */ + @Override public long size() throws IOException { + return delegate.size(); + } + + /** {@inheritDoc} */ + @Override public void clear() throws IOException { + delegate.clear(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + delegate.close(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java new file mode 100644 index 0000000..0ffc653 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; + +public interface FileIOFactory extends Serializable { + + /** + * Creates I/O interface for file with default I/O mode + * + * @param file File + * @return File I/O interface + * @throws IOException If I/O interface creation was failed + */ + FileIO create(File file) throws IOException; + + /** + * Creates I/O interface for file with specified mode + * + * @param file File + * @param mode I/O mode in + * @return File I/O interface + * @throws IOException If I/O interface creation was failed + */ + FileIO create(File file, String mode) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index 6ddc9fc..c827e96 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -19,10 +19,8 @@ package org.apache.ignite.internal.processors.cache.persistence.file; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.channels.FileChannel; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -61,11 +59,11 @@ public class FilePageStore implements PageStore { /** Database configuration. */ private final MemoryConfiguration dbCfg; - /** */ - private RandomAccessFile file; + /** Factory to provide I/O interfaces for read/write operations with files */ + private final FileIOFactory ioFactory; - /** */ - private FileChannel ch; + /** I/O interface for read/write operations with file */ + private FileIO fileIO; /** */ private final AtomicLong allocated; @@ -91,11 +89,12 @@ public class FilePageStore implements PageStore { /** * @param file File. */ - public FilePageStore(byte type, File file, MemoryConfiguration cfg) { + public FilePageStore(byte type, File file, FileIOFactory factory, MemoryConfiguration cfg) { this.type = type; cfgFile = file; dbCfg = cfg; + ioFactory = factory; allocated = new AtomicLong(); @@ -136,7 +135,7 @@ public class FilePageStore implements PageStore { ByteBuffer hdr = header(type, dbCfg.getPageSize()); while (hdr.remaining() > 0) - ch.write(hdr); + fileIO.write(hdr); } catch (IOException e) { throw new IgniteException("Check file failed.", e); @@ -154,7 +153,7 @@ public class FilePageStore implements PageStore { ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); while (hdr.remaining() > 0) - ch.read(hdr); + fileIO.read(hdr); hdr.rewind(); @@ -186,7 +185,7 @@ public class FilePageStore implements PageStore { " [expectedPageSize=" + dbCfg.getPageSize() + ", filePageSize=" + pageSize + "]"); - long fileSize = file.length(); + long fileSize = cfgFile.length(); if (fileSize == HEADER_SIZE) // Every file has a special meta page. fileSize = pageSize + HEADER_SIZE; @@ -214,9 +213,9 @@ public class FilePageStore implements PageStore { if (!inited) return; - ch.force(false); + fileIO.force(); - file.close(); + fileIO.close(); if (cleanFile) cfgFile.delete(); @@ -241,9 +240,7 @@ public class FilePageStore implements PageStore { this.tag = tag; - ch.position(0); - - file.setLength(0); + fileIO.clear(); allocated.set(initFile()); } @@ -277,7 +274,7 @@ public class FilePageStore implements PageStore { try { if (inited) - allocated.set(ch.size()); + allocated.set(fileIO.size()); recover = false; } @@ -303,7 +300,7 @@ public class FilePageStore implements PageStore { int len = pageSize; do { - int n = ch.read(pageBuf, off); + int n = fileIO.read(pageBuf, off); // If page was not written yet, nothing to read. if (n < 0) { @@ -330,7 +327,7 @@ public class FilePageStore implements PageStore { if ((savedCrc32 ^ curCrc32) != 0) throw new IgniteDataIntegrityViolationException("Failed to read page (CRC validation failed) " + "[id=" + U.hexLong(pageId) + ", off=" + (off - pageSize) + - ", file=" + cfgFile.getAbsolutePath() + ", fileSize=" + ch.size() + + ", file=" + cfgFile.getAbsolutePath() + ", fileSize=" + fileIO.size() + ", savedCrc=" + U.hexInt(savedCrc32) + ", curCrc=" + U.hexInt(curCrc32) + "]"); } @@ -356,7 +353,7 @@ public class FilePageStore implements PageStore { long off = 0; do { - int n = ch.read(buf, off); + int n = fileIO.read(buf, off); // If page was not written yet, nothing to read. if (n < 0) @@ -382,16 +379,14 @@ public class FilePageStore implements PageStore { try { if (!inited) { - RandomAccessFile rndFile = null; + FileIO fileIO = null; IgniteCheckedException err = null; try { - file = rndFile = new RandomAccessFile(cfgFile, "rw"); - - ch = file.getChannel(); + this.fileIO = fileIO = ioFactory.create(cfgFile, "rw"); - if (file.length() == 0) + if (cfgFile.length() == 0) allocated.set(initFile()); else allocated.set(checkFile()); @@ -402,9 +397,9 @@ public class FilePageStore implements PageStore { throw err = new IgniteCheckedException("Can't open file: " + cfgFile.getName(), e); } finally { - if (err != null && rndFile != null) + if (err != null && fileIO != null) try { - rndFile.close(); + fileIO.close(); } catch (IOException e) { err.addSuppressed(e); @@ -447,7 +442,7 @@ public class FilePageStore implements PageStore { int len = pageSize; do { - int n = ch.write(pageBuf, off); + int n = fileIO.write(pageBuf, off); off += n; @@ -478,7 +473,7 @@ public class FilePageStore implements PageStore { try { init(); - ch.force(false); + fileIO.force(); } catch (IOException e) { throw new IgniteCheckedException("Sync error", e); http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 6aa2243..4a56ec7 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -367,6 +367,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen FilePageStore idxStore = new FilePageStore( PageMemory.FLAG_IDX, idxFile, + pstCfg.getFileIOFactory(), cctx.kernalContext().config().getMemoryConfiguration()); FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()]; @@ -375,6 +376,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen FilePageStore partStore = new FilePageStore( PageMemory.FLAG_DATA, new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)), + pstCfg.getFileIOFactory(), cctx.kernalContext().config().getMemoryConfiguration() ); http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java new file mode 100644 index 0000000..73a560a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +/** + * File I/O implementation based on {@code java.io.RandomAccessFile}. + */ +public class RandomAccessFileIO implements FileIO { + + /** + * Random access file associated with this I/O + */ + private final RandomAccessFile file; + + /** + * File channel associated with {@code file} + */ + private final FileChannel channel; + + /** + * Creates I/O implementation for specified {@code file} + * + * @param file Random access file + */ + public RandomAccessFileIO(RandomAccessFile file) { + this.file = file; + this.channel = file.getChannel(); + } + + /** {@inheritDoc} */ + @Override public long position() throws IOException { + return channel.position(); + } + + /** {@inheritDoc} */ + @Override public void position(long newPosition) throws IOException { + channel.position(newPosition); + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destinationBuffer) throws IOException { + return channel.read(destinationBuffer); + } + + /** {@inheritDoc} */ + @Override public int read(ByteBuffer destinationBuffer, long position) throws IOException { + return channel.read(destinationBuffer, position); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] buffer, int offset, int length) throws IOException { + return file.read(buffer, offset, length); + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer sourceBuffer) throws IOException { + return channel.write(sourceBuffer); + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer sourceBuffer, long position) throws IOException { + return channel.write(sourceBuffer, position); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] buffer, int offset, int length) throws IOException { + file.write(buffer, offset, length); + } + + /** {@inheritDoc} */ + @Override public void force() throws IOException { + channel.force(false); + } + + /** {@inheritDoc} */ + @Override public long size() throws IOException { + return channel.size(); + } + + /** {@inheritDoc} */ + @Override public void clear() throws IOException { + channel.position(0); + file.setLength(0); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + file.close(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java new file mode 100644 index 0000000..6b731f2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIOFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.file; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +/** + * File I/O factory which provides RandomAccessFileIO implementation of FileIO. + */ +public class RandomAccessFileIOFactory implements FileIOFactory { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return create(file, "rw"); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, String mode) throws IOException { + RandomAccessFile rf = new RandomAccessFile(file, mode); + + return new RandomAccessFileIO(rf); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index f4bace1..beed90b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -21,15 +21,15 @@ import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteOrder; -import java.nio.channels.FileChannel; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.lang.IgniteBiTuple; @@ -71,6 +71,9 @@ public abstract class AbstractWalRecordsIterator /** Serializer of current version to read headers. */ @NotNull private final RecordSerializer serializer; + /** Factory to provide I/O interfaces for read/write operations with files */ + @NotNull protected final FileIOFactory ioFactory; + /** Utility buffer for reading records */ private final ByteBufferExpander buf; @@ -84,11 +87,13 @@ public abstract class AbstractWalRecordsIterator @NotNull final IgniteLogger log, @NotNull final GridCacheSharedContext sharedCtx, @NotNull final RecordSerializer serializer, + @NotNull final FileIOFactory ioFactory, final int bufSize ) { this.log = log; this.sharedCtx = sharedCtx; this.serializer = serializer; + this.ioFactory = ioFactory; // Do not allocate direct buffer for iterator. buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder()); @@ -229,15 +234,14 @@ public abstract class AbstractWalRecordsIterator @Nullable final FileWALPointer start) throws IgniteCheckedException, FileNotFoundException { try { - RandomAccessFile rf = new RandomAccessFile(desc.file, "r"); + FileIO fileIO = ioFactory.create(desc.file, "r"); try { - FileChannel ch = rf.getChannel(); - FileInput in = new FileInput(ch, buf); + FileInput in = new FileInput(fileIO, buf); // Header record must be agnostic to the serializer version. WALRecord rec = serializer.readRecord(in, - new FileWALPointer(desc.idx, (int)ch.position(), 0)); + new FileWALPointer(desc.idx, (int)fileIO.position(), 0)); if (rec == null) return null; @@ -252,11 +256,11 @@ public abstract class AbstractWalRecordsIterator if (start != null && desc.idx == start.index()) in.seek(start.fileOffset()); - return new FileWriteAheadLogManager.ReadFileHandle(rf, desc.idx, sharedCtx.igniteInstanceName(), ser, in); + return new FileWriteAheadLogManager.ReadFileHandle(fileIO, desc.idx, sharedCtx.igniteInstanceName(), ser, in); } catch (SegmentEofException | EOFException ignore) { try { - rf.close(); + fileIO.close(); } catch (IOException ce) { throw new IgniteCheckedException(ce); @@ -266,7 +270,7 @@ public abstract class AbstractWalRecordsIterator } catch (IOException | IgniteCheckedException e) { try { - rf.close(); + fileIO.close(); } catch (IOException ce) { e.addSuppressed(ce); http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java index 00c7c02..6443a7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.wal; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; import org.jetbrains.annotations.NotNull; @@ -36,8 +36,8 @@ public final class FileInput implements ByteBufferBackedDataInput { */ private ByteBuffer buf; - /** File channel to read chunks from */ - private FileChannel ch; + /** I/O interface for read/write operations with file */ + private FileIO io; /** */ private long pos; @@ -46,28 +46,20 @@ public final class FileInput implements ByteBufferBackedDataInput { private ByteBufferExpander expBuf; /** - * @param ch Channel to read from - * @param buf Buffer for reading blocks of data into + * @param io FileIO to read from. + * @param buf Buffer for reading blocks of data into. */ - public FileInput(FileChannel ch, ByteBuffer buf) throws IOException { - assert ch != null; + public FileInput(FileIO io, ByteBufferExpander buf) throws IOException { + assert io != null; - this.ch = ch; - this.buf = buf; + this.io = io; + this.buf = buf.buffer(); - pos = ch.position(); + expBuf = buf; - clearBuffer(); - } - - /** - * @param ch Channel to read from - * @param expBuf ByteBufferWrapper with ability expand buffer dynamically. - */ - public FileInput(FileChannel ch, ByteBufferExpander expBuf) throws IOException { - this(ch, expBuf.buffer()); + pos = io.position(); - this.expBuf = expBuf; + clearBuffer(); } /** @@ -84,10 +76,10 @@ public final class FileInput implements ByteBufferBackedDataInput { * @param pos Position in bytes from file begin. */ public void seek(long pos) throws IOException { - if (pos > ch.size()) + if (pos > io.size()) throw new EOFException(); - ch.position(pos); + io.position(pos); this.pos = pos; @@ -118,10 +110,10 @@ public final class FileInput implements ByteBufferBackedDataInput { buf.compact(); do { - int read = ch.read(buf); + int read = io.read(buf); if (read == -1) - throw new EOFException("EOF at position [" + ch.position() + "] expected to read [" + requested + "] bytes"); + throw new EOFException("EOF at position [" + io.position() + "] expected to read [" + requested + "] bytes"); available += read; http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 162f43d..5c112fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -22,10 +22,8 @@ import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.channels.FileChannel; import java.nio.file.Files; import java.sql.Time; import java.util.Arrays; @@ -48,6 +46,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.events.EventType; +import org.apache.ignite.events.WalSegmentArchivedEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; @@ -61,7 +60,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.PersistenceMetricsImpl; -import org.apache.ignite.events.WalSegmentArchivedEvent; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; @@ -153,6 +153,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private volatile long oldestArchiveSegmentIdx; + /** Factory to provide I/O interfaces for read/write operations with files */ + private final FileIOFactory ioFactory; + /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */ private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> currentHndUpd = AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd"); @@ -181,6 +184,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Current log segment handle */ private volatile FileWriteHandle currentHnd; + /** Environment failure. */ + private volatile Throwable envFailed; + /** * Positive (non-0) value indicates WAL can be archived even if not complete<br> * See {@link PersistentStoreConfiguration#setWalAutoArchiveAfterInactivity(long)}<br> @@ -225,6 +231,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl flushFreq = psCfg.getWalFlushFrequency(); fsyncDelay = psCfg.getWalFsyncDelay(); alwaysWriteFullPages = psCfg.isAlwaysWriteFullPages(); + ioFactory = psCfg.getFileIOFactory(); walAutoArchiveAfterInactivity = psCfg.getWalAutoArchiveAfterInactivity(); evt = ctx.event(); } @@ -322,7 +329,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl archiver.shutdown(); } catch (Exception e) { - U.error(log, "Failed to gracefully close WAL segment: " + currHnd.file, e); + U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.fileIO, e); } } @@ -493,6 +500,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return ptr; } + checkEnvironment(); + if (isStopping()) throw new IgniteCheckedException("Stopping."); } @@ -549,6 +558,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl end, psCfg, serializer, + ioFactory, archiver, log, tlbSize @@ -800,13 +810,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl int len = lastReadPtr == null ? 0 : lastReadPtr.length(); try { - RandomAccessFile file = new RandomAccessFile(curFile, "rw"); + FileIO fileIO = ioFactory.create(curFile); try { // readSerializerVersion will change the channel position. // This is fine because the FileWriteHandle consitructor will move it // to offset + len anyways. - int serVer = readSerializerVersion(file, curFile, absIdx); + int serVer = readSerializerVersion(fileIO, curFile, absIdx); RecordSerializer ser = forVersion(cctx, serVer); @@ -815,7 +825,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl ", offset=" + offset + ", ver=" + serVer + ']'); FileWriteHandle hnd = new FileWriteHandle( - file, + fileIO, absIdx, cctx.igniteInstanceName(), offset + len, @@ -835,7 +845,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return hnd; } catch (IgniteCheckedException | IOException e) { - file.close(); + fileIO.close(); throw e; } @@ -862,10 +872,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (log.isDebugEnabled()) log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath()); - RandomAccessFile file = new RandomAccessFile(nextFile, "rw"); + FileIO fileIO = ioFactory.create(nextFile); FileWriteHandle hnd = new FileWriteHandle( - file, + fileIO, curIdx + 1, cctx.igniteInstanceName(), 0, @@ -929,22 +939,22 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (log.isDebugEnabled()) log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); - try (RandomAccessFile rnd = new RandomAccessFile(file, "rw")) { + try (FileIO fileIO = ioFactory.create(file, "rw")) { int left = psCfg.getWalSegmentSize(); if (mode == WALMode.DEFAULT) { while (left > 0) { int toWrite = Math.min(FILL_BUF.length, left); - rnd.write(FILL_BUF, 0, toWrite); + fileIO.write(FILL_BUF, 0, toWrite); left -= toWrite; } - rnd.getChannel().force(false); + fileIO.force(); } else - rnd.setLength(0); + fileIO.clear(); } catch (IOException e) { throw new IgniteCheckedException("Failed to format WAL segment file: " + file.getAbsolutePath(), e); @@ -1033,6 +1043,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * @throws StorageException If environment is no longer valid and we missed a WAL write. + */ + private void checkEnvironment() throws StorageException { + if (envFailed != null) + throw new StorageException("Failed to flush WAL buffer (environment was invalidated by a " + + "previous error)", envFailed); + } + + /** * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate * the work WAL segment: S(N) = N % psCfg.walSegments. * When a work segment is finished, it is given to the archiver. If the absolute index of last archived segment @@ -1337,8 +1356,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl Files.move(dstTmpFile.toPath(), dstFile.toPath()); if (mode == WALMode.DEFAULT) { - try (RandomAccessFile f0 = new RandomAccessFile(dstFile, "rw")) { - f0.getChannel().force(false); + try (FileIO f0 = ioFactory.create(dstFile, "rw")) { + f0.force(); } } } @@ -1402,20 +1421,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @param rf Random access file. + * @param io I/O interface for file. * @param file File object. * @param idx File index to read. * @return Serializer version stored in the file. * @throws IOException If failed to read serializer version. * @throws IgniteCheckedException If failed to read serializer version. */ - private int readSerializerVersion(RandomAccessFile rf, File file, long idx) + private int readSerializerVersion(FileIO io, File file, long idx) throws IOException, IgniteCheckedException { try { ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE); buf.order(ByteOrder.nativeOrder()); - FileInput in = new FileInput(rf.getChannel(), buf); + FileInput in = new FileInput(io, + new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())); // Header record must be agnostic to the serializer version. WALRecord rec = serializer.readRecord(in, new FileWALPointer(idx, 0, 0)); @@ -1541,11 +1561,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * */ private abstract static class FileHandle { - /** */ - protected RandomAccessFile file; - - /** */ - protected FileChannel ch; + /** I/O interface for read/write operations with file */ + protected FileIO fileIO; /** Absolute WAL segment file index (incremental counter) */ protected final long idx; @@ -1554,17 +1571,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl protected String gridName; /** - * @param file File. + * @param fileIO I/O interface for read/write operations of FileHandle. * @param idx Absolute WAL segment file index (incremental counter). */ - private FileHandle(RandomAccessFile file, long idx, String gridName) { - this.file = file; + private FileHandle(FileIO fileIO, long idx, String gridName) { + this.fileIO = fileIO; this.idx = idx; this.gridName = gridName; - - ch = file.getChannel(); - - assert ch != null; } } @@ -1585,19 +1598,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private boolean workDir; /** - * @param file File to read. + * @param fileIO I/O interface for read/write operations of FileHandle. * @param idx Absolute WAL segment file index (incremental counter). * @param ser Entry serializer. * @param in File input. */ ReadFileHandle( - RandomAccessFile file, - long idx, - String gridName, - RecordSerializer ser, - FileInput in + FileIO fileIO, + long idx, + String gridName, + RecordSerializer ser, + FileInput in ) { - super(file, idx, gridName); + super(fileIO, idx, gridName); this.ser = ser; this.in = in; @@ -1608,7 +1621,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ public void close() throws IgniteCheckedException { try { - file.close(); + fileIO.close(); } catch (IOException e) { throw new IgniteCheckedException(e); @@ -1644,10 +1657,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private volatile long lastFsyncPos; - /** Environment failure. */ - private volatile Throwable envFailed; - - /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)} */ + /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}*/ private final AtomicBoolean stop = new AtomicBoolean(false); /** */ @@ -1661,12 +1671,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * Next segment available condition. - * Protection from "spurious wakeup" is provided by predicate {@link #ch}=<code>null</code> + * Protection from "spurious wakeup" is provided by predicate {@link #fileIO}=<code>null</code> */ private final Condition nextSegment = lock.newCondition(); /** - * @param file Mapped file to use. + * @param fileIO I/O file interface to use * @param idx Absolute WAL segment file index for easy access. * @param pos Position. * @param maxSegmentSize Max segment size. @@ -1674,18 +1684,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @throws IOException If failed. */ private FileWriteHandle( - RandomAccessFile file, + FileIO fileIO, long idx, String gridName, long pos, long maxSegmentSize, RecordSerializer serializer ) throws IOException { - super(file, idx, gridName); + super(fileIO, idx, gridName); assert serializer != null; - ch.position(pos); + fileIO.position(pos); this.maxSegmentSize = maxSegmentSize; this.serializer = serializer; @@ -1887,6 +1897,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl catch (Throwable e) { invalidateEnvironment(e); + // All workers waiting for a next segment must be woken up and stopped + signalNextAvailable(); + throw e; } } @@ -1990,7 +2003,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl long start = metricsEnabled ? System.nanoTime() : 0; try { - ch.force(false); + fileIO.force(); } catch (IOException e) { throw new StorageException(e); @@ -2027,20 +2040,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl try { int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE; + if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) { //it is expected there is sufficient space for this record because rollover should run early final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize); buf.put((byte)(WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal() + 1)); - final FileWALPointer pointer = new FileWALPointer(idx, (int)ch.position(), -1); + + final FileWALPointer pointer = new FileWALPointer(idx, (int)fileIO.position(), -1); RecordV1Serializer.putPosition(buf, pointer); + buf.rewind(); - ch.write(buf, written); + + fileIO.write(buf, written); if (mode == WALMode.DEFAULT) - ch.force(false); + fileIO.force(); } - ch.close(); + fileIO.close(); } catch (IOException e) { throw new IgniteCheckedException(e); @@ -2064,13 +2081,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl try { WALRecord rec = head.get(); - assert rec instanceof FakeRecord : "Expected head FakeRecord, actual head " + if (envFailed == null) { + assert rec instanceof FakeRecord : "Expected head FakeRecord, actual head " + (rec != null ? rec.getClass().getSimpleName() : "null"); - assert written == lastFsyncPos || mode != WALMode.DEFAULT : + assert written == lastFsyncPos || mode != WALMode.DEFAULT : "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ']'; + } - ch = null; + fileIO = null; nextSegment.signalAll(); } @@ -2086,7 +2105,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl lock.lock(); try { - while (ch != null) + while (fileIO != null) U.await(nextSegment); } finally { @@ -2108,7 +2127,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl lock.lock(); try { - assert ch != null : "Writing to a closed segment."; + assert fileIO != null : "Writing to a closed segment."; checkEnvironment(); @@ -2151,10 +2170,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert size > 0 : size; try { - assert written == ch.position(); + assert written == fileIO.position(); do { - ch.write(buf); + fileIO.write(buf); } while (buf.hasRemaining()); @@ -2162,7 +2181,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl metrics.onWalBytesWritten(size); - assert written == ch.position(); + assert written == fileIO.position(); } catch (IOException e) { invalidateEnvironmentLocked(e); @@ -2215,25 +2234,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @throws StorageException If environment is no longer valid and we missed a WAL write. - */ - private void checkEnvironment() throws StorageException { - if (envFailed != null) - throw new StorageException("Failed to flush WAL buffer (environment was invalidated by a " + - "previous error)", envFailed); - } - - /** * @return Safely reads current position of the file channel as String. Will return "null" if channel is null. */ private String safePosition() { - FileChannel ch = this.ch; + FileIO io = this.fileIO; - if (ch == null) + if (io == null) return "null"; try { - return String.valueOf(ch.position()); + return String.valueOf(io.position()); } catch (IOException e) { return "{Failed to read channel position: " + e.getMessage() + "}"; @@ -2320,6 +2330,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl @Nullable FileWALPointer end, PersistentStoreConfiguration psCfg, @NotNull RecordSerializer serializer, + FileIOFactory ioFactory, FileArchiver archiver, IgniteLogger log, int tlbSize @@ -2327,6 +2338,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl super(log, cctx, serializer, + ioFactory, psCfg.getWalRecordIteratorBufferSize()); this.walWorkDir = walWorkDir; this.walArchiveDir = walArchiveDir; @@ -2479,11 +2491,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private void doFlush() { final FileWriteHandle hnd = currentHandle(); - try { hnd.flush(hnd.head.get()); } - catch (IgniteCheckedException e) { + catch (Exception e) { U.warn(log, "Failed to flush WAL record queue", e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java index 8ea0585..4e3998b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.jetbrains.annotations.NotNull; /** @@ -34,15 +35,18 @@ public class IgniteWalIteratorFactory { private final IgniteLogger log; /** Page size, in standalone iterator mode this value can't be taken from memory configuration */ private final int pageSize; + /** Factory to provide I/O interfaces for read/write operations with files */ + private final FileIOFactory ioFactory; /** * Creates WAL files iterator factory * @param log Logger. * @param pageSize Page size, size is validated */ - public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, final int pageSize) { + public IgniteWalIteratorFactory(@NotNull IgniteLogger log, @NotNull FileIOFactory ioFactory, int pageSize) { this.log = log; this.pageSize = pageSize; + this.ioFactory = ioFactory; new MemoryConfiguration().setPageSize(pageSize); // just for validate } @@ -57,7 +61,7 @@ public class IgniteWalIteratorFactory { * @throws IgniteCheckedException if failed to read folder */ public WALIterator iteratorArchiveDirectory(@NotNull final File walDirWithConsistentId) throws IgniteCheckedException { - return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx()); + return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx(), ioFactory); } /** @@ -69,7 +73,7 @@ public class IgniteWalIteratorFactory { * @throws IgniteCheckedException if failed to read files */ public WALIterator iteratorArchiveFiles(@NotNull final File ...files) throws IgniteCheckedException { - return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), false, files); + return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, false, files); } /** @@ -81,7 +85,7 @@ public class IgniteWalIteratorFactory { * @throws IgniteCheckedException if failed to read files */ public WALIterator iteratorWorkFiles(@NotNull final File ...files) throws IgniteCheckedException { - return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), true, files); + return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), ioFactory, true, files); } /** @@ -93,6 +97,7 @@ public class IgniteWalIteratorFactory { final StandaloneIgniteCacheDatabaseSharedManager dbMgr = new StandaloneIgniteCacheDatabaseSharedManager(); dbMgr.setPageSize(pageSize); + return new GridCacheSharedContext<>( kernalCtx, null, null, null, null, null, dbMgr, null, http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index df932e6..02b9352 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.GridComponent; @@ -82,13 +83,25 @@ import org.jetbrains.annotations.Nullable; * Dummy grid kernal context */ public class StandaloneGridKernalContext implements GridKernalContext { + /** */ private IgniteLogger log; + /** */ + private IgnitePluginProcessor pluginProc; + /** * @param log Logger. */ StandaloneGridKernalContext(IgniteLogger log) { this.log = log; + + try { + pluginProc = new StandaloneIgnitePluginProcessor( + this, config()); + } + catch (IgniteCheckedException e) { + throw new IllegalStateException("Must not fail on empty providers list.", e); + } } /** {@inheritDoc} */ @@ -278,7 +291,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** {@inheritDoc} */ @Override public IgnitePluginProcessor plugins() { - return null; + return pluginProc; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java new file mode 100644 index 0000000..838fc85 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgnitePluginProcessor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.reader; + +import java.util.Collections; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; +import org.apache.ignite.plugin.PluginProvider; + +/** + * + */ +public class StandaloneIgnitePluginProcessor extends IgnitePluginProcessor { + /** + * @param ctx Kernal context. + * @param cfg Ignite configuration. + */ + public StandaloneIgnitePluginProcessor(GridKernalContext ctx, IgniteConfiguration cfg) throws IgniteCheckedException { + super(ctx, cfg, Collections.<PluginProvider>emptyList()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index f17c112..ecad70a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -21,10 +21,7 @@ import java.io.DataInput; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -33,7 +30,10 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; @@ -83,14 +83,17 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { * @param walFilesDir Wal files directory. Should already contain node consistent ID as subfolder * @param log Logger. * @param sharedCtx Shared context. + * @param ioFactory File I/O factory. */ StandaloneWalRecordsIterator( - @NotNull final File walFilesDir, - @NotNull final IgniteLogger log, - @NotNull final GridCacheSharedContext sharedCtx) throws IgniteCheckedException { + @NotNull File walFilesDir, + @NotNull IgniteLogger log, + @NotNull GridCacheSharedContext sharedCtx, + @NotNull FileIOFactory ioFactory) throws IgniteCheckedException { super(log, sharedCtx, new RecordV1Serializer(sharedCtx), + ioFactory, BUF_SIZE); init(walFilesDir, false, null); advance(); @@ -101,17 +104,20 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { * * @param log Logger. * @param sharedCtx Shared context. + * @param ioFactory File I/O factory. * @param workDir Work directory is scanned, false - archive * @param walFiles Wal files. */ StandaloneWalRecordsIterator( - @NotNull final IgniteLogger log, - @NotNull final GridCacheSharedContext sharedCtx, - final boolean workDir, - @NotNull final File... walFiles) throws IgniteCheckedException { + @NotNull IgniteLogger log, + @NotNull GridCacheSharedContext sharedCtx, + @NotNull FileIOFactory ioFactory, + boolean workDir, + @NotNull File... walFiles) throws IgniteCheckedException { super(log, sharedCtx, new RecordV1Serializer(sharedCtx), + ioFactory, BUF_SIZE); this.workDir = workDir; init(null, workDir, walFiles); @@ -138,10 +144,12 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { } else { this.workDir = workDir; + if (workDir) walFileDescriptors = scanIndexesFromFileHeaders(walFiles); else walFileDescriptors = new ArrayList<>(Arrays.asList(FileWriteAheadLogManager.scan(walFiles))); + curWalSegmIdx = !walFileDescriptors.isEmpty() ? walFileDescriptors.get(0).getIdx() : 0; } curWalSegmIdx--; @@ -172,13 +180,10 @@ class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { FileWALPointer ptr; - try (RandomAccessFile rf = new RandomAccessFile(file, "r");) { - final FileChannel ch = rf.getChannel(); - final ByteBuffer buf = ByteBuffer.allocate(HEADER_RECORD_SIZE); - - buf.order(ByteOrder.nativeOrder()); + try (FileIO fileIO = ioFactory.create(file, "r")) { + final DataInput in = new FileInput(fileIO, + new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder())); - final DataInput in = new FileInput(ch, buf); // Header record must be agnostic to the serializer version. final int type = in.readUnsignedByte(); http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java index 6847482..e086258 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.persistence; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.channels.FileChannel; import java.util.Collection; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -43,7 +42,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; @@ -191,13 +190,13 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract FilePageStore filePageStore = (FilePageStore)store; - FileChannel ch = U.field(filePageStore, "ch"); + FileIO fileIO = U.field(filePageStore, "fileIO"); - long size = ch.size(); + long size = fileIO.size(); - ch.write(ByteBuffer.allocate((int)size - FilePageStore.HEADER_SIZE), FilePageStore.HEADER_SIZE); + fileIO.write(ByteBuffer.allocate((int)size - FilePageStore.HEADER_SIZE), FilePageStore.HEADER_SIZE); - ch.force(false); + fileIO.force(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java new file mode 100644 index 0000000..cad10ae --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.GridKernalState; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * + */ +public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest { + + /** */ + private static final String TEST_CACHE = "testCache"; + + /** */ + private boolean flushByTimeout; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + deleteWorkFiles(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 30_000L; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = new CacheConfiguration(TEST_CACHE) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + + cfg.setCacheConfiguration(cacheCfg); + + MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration() + .setName("dfltMemPlc") + .setInitialSize(2 * 1024L * 1024L * 1024L); + + MemoryConfiguration memCfg = new MemoryConfiguration() + .setMemoryPolicies(memPlcCfg) + .setDefaultMemoryPolicyName(memPlcCfg.getName()); + + cfg.setMemoryConfiguration(memCfg); + + PersistentStoreConfiguration storeCfg = new PersistentStoreConfiguration() + .setFileIOFactory(new FailingFileIOFactory()) + .setWalMode(WALMode.BACKGROUND) + // Setting WAL Segment size to high values forces flushing by timeout. + .setWalSegmentSize(flushByTimeout ? 500_000 : 50_000); + + cfg.setPersistentStoreConfiguration(storeCfg); + + return cfg; + } + + /** + * Test flushing error recovery when flush is triggered asynchronously by timeout + * + * @throws Exception In case of fail + */ + public void testErrorOnFlushByTimeout() throws Exception { + flushByTimeout = true; + flushingErrorTest(); + } + + /** + * Test flushing error recovery when flush is triggered directly by transaction commit + * + * @throws Exception In case of fail + */ + public void testErrorOnDirectFlush() throws Exception { + flushByTimeout = false; + flushingErrorTest(); + } + + /** + * @throws Exception if failed. + */ + private void flushingErrorTest() throws Exception { + final IgniteEx grid = startGrid(0); + grid.active(true); + + IgniteCache<Object, Object> cache = grid.cache(TEST_CACHE); + + final int iterations = 100; + + try { + for (int i = 0; i < iterations; i++) { + Transaction tx = grid.transactions().txStart( + TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED); + + cache.put(i, "testValue" + i); + + Thread.sleep(100L); + + tx.commitAsync().get(); + } + } + catch (Exception expected) { + // There can be any exception. Do nothing. + } + + // We should await successful stop of node. + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + return grid.context().gateway().getState() == GridKernalState.STOPPED; + } + }, getTestTimeout()); + } + + /** + * @throws IgniteCheckedException + */ + private void deleteWorkFiles() throws IgniteCheckedException { + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); + } + + /** + * Create File I/O which fails after second attempt to write to File + */ + private static class FailingFileIOFactory implements FileIOFactory { + private static final long serialVersionUID = 0L; + + private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory(); + + @Override + public FileIO create(File file) throws IOException { + return create(file, "rw"); + } + + @Override + public FileIO create(File file, String mode) throws IOException { + FileIO delegate = delegateFactory.create(file, mode); + + return new FileIODecorator(delegate) { + int writeAttempts = 2; + + @Override + public int write(ByteBuffer sourceBuffer) throws IOException { + if (--writeAttempts == 0) + throw new RuntimeException("Test exception. Unable to write to file."); + + return super.write(sourceBuffer); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/17d881ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java index 303f14e..b93c74d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java @@ -23,7 +23,10 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; @@ -47,9 +50,10 @@ public class IgniteDataIntegrityTests extends TestCase { randomAccessFile = new RandomAccessFile(file, "rw"); - fileInput = new FileInput(randomAccessFile.getChannel(), ByteBuffer.allocate(1024)); - - PureJavaCrc32 pureJavaCrc32 = new PureJavaCrc32(); + fileInput = new FileInput( + new RandomAccessFileIO(randomAccessFile), + new ByteBufferExpander(1024, ByteOrder.BIG_ENDIAN) + ); ByteBuffer buf = ByteBuffer.allocate(1024); ThreadLocalRandom curr = ThreadLocalRandom.current();