IGNITE-5323 - Moved record serializer version from file name to file header
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b47db106 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b47db106 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b47db106 Branch: refs/heads/ignite-5267 Commit: b47db106d42afb5dcbee57b793b63efa43fc4ef2 Parents: e1c328e Author: Alexey Goncharuk <[email protected]> Authored: Mon Jun 5 11:41:42 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Jun 5 11:41:42 2017 +0300 ---------------------------------------------------------------------- .../database/wal/FileWriteAheadLogManager.java | 105 ++++++++++++------- .../wal/serializer/RecordV1Serializer.java | 13 ++- 2 files changed, 79 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b47db106/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java index 4b79308..8a113ba 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java @@ -79,10 +79,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private static final byte[] FILL_BUF = new byte[1024 * 1024]; /** */ - private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.v\\d+\\.wal"); + private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal"); /** */ - private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.v\\d+\\.wal\\.tmp"); + private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp"); /** */ private static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() { @@ -322,6 +322,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl currentHnd = restoreWriteHandle(filePtr); + if (currentHnd.serializer.version() != serializer.version()) { + if (log.isInfoEnabled()) + log.info("Record serializer version change detected, will start logging with a new WAL record " + + "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() + + ", oldVer=" + currentHnd.serializer.version() + ']'); + + rollOver(currentHnd); + } + if (mode == Mode.BACKGROUND) { flusher = new QueueFlusher(cctx.igniteInstanceName()); @@ -443,7 +452,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } private boolean hasIndex(long absIdx) { - String name = FileDescriptor.fileName(absIdx, serializer.version()); + String name = FileDescriptor.fileName(absIdx); boolean inArchive = new File(walArchiveDir, name).exists(); @@ -569,29 +578,35 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException { long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); - archiver.currentWalIndex(absIdx); - long segNo = absIdx % dbCfg.getWalSegments(); - File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo, serializer.version())); + File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo)); int offset = lastReadPtr == null ? 0 : lastReadPtr.fileOffset(); int len = lastReadPtr == null ? 0 : lastReadPtr.length(); - log.info("Resuming logging in WAL segment [file=" + curFile.getAbsolutePath() + - ", offset=" + offset + ']'); - try { RandomAccessFile file = new RandomAccessFile(curFile, "rw"); try { + // readSerializerVersion will change the channel position. + // This is fine because the FileWriteHandle consitructor will move it + // to offset + len anyways. + int serVer = readSerializerVersion(file, curFile, absIdx); + + RecordSerializer ser = forVersion(cctx, serVer); + + if (log.isInfoEnabled()) + log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() + + ", offset=" + offset + ", ver=" + serVer + ']'); + FileWriteHandle hnd = new FileWriteHandle( file, absIdx, cctx.igniteInstanceName(), offset + len, maxWalSegmentSize, - serializer); + ser); if (lastReadPtr == null) { HeaderRecord header = new HeaderRecord(serializer.version()); @@ -601,6 +616,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl hnd.addRecord(header); } + archiver.currentWalIndex(absIdx); + return hnd; } catch (IgniteCheckedException | IOException e) { @@ -681,7 +698,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // Allocate the first segment synchronously. All other segments will be allocated by archiver in background. if (allFiles.length == 0) { - File first = new File(walWorkDir, FileDescriptor.fileName(0, serializer.version())); + File first = new File(walWorkDir, FileDescriptor.fileName(0)); createFile(first); } @@ -761,7 +778,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl long segmentIdx = absNextIdx % dbCfg.getWalSegments(); - return new File(walWorkDir, FileDescriptor.fileName(segmentIdx, serializer.version())); + return new File(walWorkDir, FileDescriptor.fileName(segmentIdx)); } /** @@ -1072,9 +1089,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private File archiveSegment(long absIdx) throws IgniteCheckedException { long segIdx = absIdx % dbCfg.getWalSegments(); - File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx, serializer.version())); + File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx)); - String name = FileDescriptor.fileName(absIdx, serializer.version()); + String name = FileDescriptor.fileName(absIdx); File dstTmpFile = new File(walArchiveDir, name + ".tmp"); @@ -1163,7 +1180,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private void checkFiles(int startWith, boolean create, IgnitePredicate<Integer> p) throws IgniteCheckedException { for (int i = startWith; i < dbCfg.getWalSegments() && (p == null || (p != null && p.apply(i))); i++) { - File checkFile = new File(walWorkDir, FileDescriptor.fileName(i, serializer.version())); + File checkFile = new File(walWorkDir, FileDescriptor.fileName(i)); if (checkFile.exists()) { if (checkFile.isDirectory()) @@ -1179,6 +1196,35 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * @param rf Random access file. + * @param file File object. + * @param idx File index to read. + * @return Serializer version stored in the file. + * @throws IOException If failed to read serializer version. + * @throws IgniteCheckedException If failed to read serializer version. + */ + private int readSerializerVersion(RandomAccessFile rf, File file, long idx) + throws IOException, IgniteCheckedException { + try { + ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE); + buf.order(ByteOrder.nativeOrder()); + + FileInput in = new FileInput(rf.getChannel(), buf); + + // Header record must be agnostic to the serializer version. + WALRecord rec = serializer.readRecord(in, new FileWALPointer(idx, 0, 0)); + + if (rec.type() != WALRecord.RecordType.HEADER_RECORD) + throw new IOException("Missing file header record: " + file.getAbsoluteFile()); + + return ((HeaderRecord)rec).version(); + } + catch (SegmentEofException | EOFException ignore) { + return serializer.version(); + } + } + + /** * WAL file descriptor. */ private static class FileDescriptor implements Comparable<FileDescriptor> { @@ -1188,9 +1234,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Absolute WAL segment file index */ protected final long idx; - /** */ - protected final int ver; - /** * @param file File. */ @@ -1209,27 +1252,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert fileName.endsWith(WAL_SEGMENT_FILE_EXT); - int v = fileName.lastIndexOf(".v"); - - assert v > 0; - - int begin = v + 2; int end = fileName.length() - WAL_SEGMENT_FILE_EXT.length(); if (idx == null) - this.idx = Long.parseLong(fileName.substring(0, v)); + this.idx = Long.parseLong(fileName.substring(0, end)); else this.idx = idx; - - ver = Integer.parseInt(fileName.substring(begin, end)); } /** * @param segment Segment index. - * @param ver Serializer version. * @return Segment file name. */ - private static String fileName(long segment, int ver) { + private static String fileName(long segment) { SB b = new SB(); String segmentStr = Long.toString(segment); @@ -1237,7 +1272,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl for (int i = segmentStr.length(); i < 16; i++) b.a('0'); - b.a(segmentStr).a(".v").a(ver).a(WAL_SEGMENT_FILE_EXT); + b.a(segmentStr).a(WAL_SEGMENT_FILE_EXT); return b.toString(); } @@ -2232,13 +2267,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (readArchive) { fd = new FileDescriptor(new File(walArchiveDir, - FileDescriptor.fileName(curIdx, serializer.version()))); + FileDescriptor.fileName(curIdx))); } else { long workIdx = curIdx % dbCfg.getWalSegments(); fd = new FileDescriptor( - new File(walWorkDir, FileDescriptor.fileName(workIdx, serializer.version())), + new File(walWorkDir, FileDescriptor.fileName(workIdx)), curIdx); } @@ -2278,11 +2313,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl RandomAccessFile rf = new RandomAccessFile(desc.file, "r"); try { - RecordSerializer ser = forVersion(cctx, desc.ver); FileChannel channel = rf.getChannel(); FileInput in = new FileInput(channel, buf); - WALRecord rec = ser.readRecord(in, + // Header record must be agnostic to the serializer version. + WALRecord rec = serializer.readRecord(in, new FileWALPointer(desc.idx, (int)channel.position(), 0)); if (rec == null) @@ -2293,9 +2328,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl int ver = ((HeaderRecord)rec).version(); - if (ver != ser.version()) - throw new IOException("Unexpected file format version: " + ver + ", " + - desc.file.getAbsoluteFile()); + RecordSerializer ser = forVersion(cctx, ver); if (start != null && desc.idx == start.index()) in.seek(start.fileOffset()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b47db106/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java index 1c56338..f39cdfd 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java @@ -97,6 +97,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; @@ -105,6 +106,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; */ public class RecordV1Serializer implements RecordSerializer { /** */ + public static final int HEADER_RECORD_SIZE = /*Type*/1 + /*Pointer */12 + /*Magic*/8 + /*Version*/4 + /*CRC*/4; + + /** */ private GridCacheSharedContext cctx; /** */ @@ -779,8 +783,11 @@ public class RecordV1Serializer implements RecordSerializer { break; case HEADER_RECORD: - if (in.readLong() != HeaderRecord.MAGIC) - throw new EOFException("Magic is corrupted."); + long magic = in.readLong(); + + if (magic != HeaderRecord.MAGIC) + throw new EOFException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) + + ", actual=" + U.hexLong(magic) + ']'); int ver = in.readInt(); @@ -1246,7 +1253,7 @@ public class RecordV1Serializer implements RecordSerializer { return commonFields + 4 + dataSize(dataRec); case HEADER_RECORD: - return commonFields + 12; + return HEADER_RECORD_SIZE; case DATA_PAGE_INSERT_RECORD: DataPageInsertRecord diRec = (DataPageInsertRecord)record;
