This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch IGNITE-17700__realtime_cdc in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/IGNITE-17700__realtime_cdc by this push: new 9b8cf4f1ec0 IGNITE-19675 [IEP-104] Implement WALIterator over ByteBuffer (#10798) 9b8cf4f1ec0 is described below commit 9b8cf4f1ec0fd2764a9e43a6aabe783782e54180 Author: yurinaryshkin <135707807+yurinarysh...@users.noreply.github.com> AuthorDate: Thu Jul 13 15:19:18 2023 +0300 IGNITE-19675 [IEP-104] Implement WALIterator over ByteBuffer (#10798) --- .../pagemem/wal/record/FilteredRecord.java | 5 +- ...or.java => AbstractFileWalRecordsIterator.java} | 47 +- .../wal/AbstractWalRecordsIteratorAdapter.java | 78 +++ .../persistence/wal/ByteBufferBackedDataInput.java | 10 + .../wal/ByteBufferBackedDataInputImpl.java | 14 + .../persistence/wal/ByteBufferWalIterator.java | 132 +++++ .../persistence/wal/Crc32CheckingDataInput.java | 200 +++++++ .../cache/persistence/wal/FileDescriptor.java | 2 +- .../persistence/wal/FileWriteAheadLogManager.java | 4 +- .../wal/SingleSegmentLogicalRecordsIterator.java | 2 +- .../cache/persistence/wal/io/FileInput.java | 221 +------ .../cache/persistence/wal/io/SimpleFileInput.java | 8 - .../wal/reader/StandaloneWalRecordsIterator.java | 4 +- .../wal/serializer/RecordSerializer.java | 4 +- .../wal/serializer/RecordV1Serializer.java | 11 +- .../wal/serializer/RecordV2Serializer.java | 3 +- .../main/resources/META-INF/classnames.properties | 4 +- .../db/wal/crc/IgniteDataIntegrityTests.java | 4 +- .../persistence/wal/ByteBufferWalIteratorTest.java | 639 +++++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite5.java | 3 + 20 files changed, 1101 insertions(+), 294 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java index e077e5accc2..915353ebf2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java @@ -16,12 +16,9 @@ */ package org.apache.ignite.internal.pagemem.wal.record; -import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator; - /** * Special type of WAL record. Shouldn't be stored in file. - * Returned by deserializer if next record is not matched by filter. Automatically handled by - * {@link AbstractWalRecordsIterator}. + * Returned by deserializer if next record is not matched by filter. */ public class FilteredRecord extends WALRecord { /** Instance. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractFileWalRecordsIterator.java similarity index 91% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractFileWalRecordsIterator.java index 926bce8fe39..4dcb55ed368 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractFileWalRecordsIterator.java @@ -25,7 +25,6 @@ import java.nio.ByteOrder; import java.util.Optional; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; @@ -36,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.SegmentHeader; -import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; @@ -49,22 +47,10 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.serial * Iterator over WAL segments. This abstract class provides most functionality for reading records in log. Subclasses * are to override segment switching functionality */ -public abstract class AbstractWalRecordsIterator - extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> implements WALIterator { +public abstract class AbstractFileWalRecordsIterator extends AbstractWalRecordsIteratorAdapter { /** */ private static final long serialVersionUID = 0L; - /** - * Current record preloaded, to be returned on next()<br> Normally this should be not null because advance() method - * should already prepare some value<br> - */ - protected IgniteBiTuple<WALPointer, WALRecord> curRec; - - /** - * The exception which can be thrown during reading next record. It holds until the next calling of next record. - */ - private IgniteCheckedException curException; - /** * Current WAL segment absolute index. <br> Determined as lowest number of file at start, is changed during advance * segment @@ -108,7 +94,7 @@ public abstract class AbstractWalRecordsIterator * @param initialReadBufferSize buffer for reading records size. * @param segmentFileInputFactory Factory to provide I/O interfaces for read primitives with files. */ - protected AbstractWalRecordsIterator( + protected AbstractFileWalRecordsIterator( @NotNull final IgniteLogger log, @NotNull final GridCacheSharedContext sharedCtx, @NotNull final RecordSerializerFactory serializerFactory, @@ -124,31 +110,6 @@ public abstract class AbstractWalRecordsIterator buf = new ByteBufferExpander(initialReadBufferSize, ByteOrder.nativeOrder()); } - /** {@inheritDoc} */ - @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws IgniteCheckedException { - if (curException != null) - throw curException; - - IgniteBiTuple<WALPointer, WALRecord> ret = curRec; - - try { - advance(); - } - catch (IgniteCheckedException e) { - curException = e; - } - - return ret; - } - - /** {@inheritDoc} */ - @Override protected boolean onHasNext() throws IgniteCheckedException { - if (curException != null) - throw curException; - - return curRec != null; - } - /** {@inheritDoc} */ @Override protected void onClose() throws IgniteCheckedException { try { @@ -167,7 +128,7 @@ public abstract class AbstractWalRecordsIterator * * @throws IgniteCheckedException If failed. */ - protected void advance() throws IgniteCheckedException { + @Override protected void advance() throws IgniteCheckedException { if (curRec != null) lastRead = curRec.get1(); @@ -192,8 +153,6 @@ public abstract class AbstractWalRecordsIterator } } catch (WalSegmentTailReachedException e) { - AbstractReadFileHandle currWalSegment = this.currWalSegment; - IgniteCheckedException e0 = validateTailReachedException(e, currWalSegment); if (e0 != null) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIteratorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIteratorAdapter.java new file mode 100644 index 00000000000..96b19eb2768 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIteratorAdapter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.lang.IgniteBiTuple; + +/** + * Iterator over WAL segments. This abstract class provides most functionality for reading records. + */ +public abstract class AbstractWalRecordsIteratorAdapter + extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> implements WALIterator { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Current record preloaded, to be returned on next()<br> Normally this should be not null because advance() method + * should already prepare some value<br> + */ + protected IgniteBiTuple<WALPointer, WALRecord> curRec; + + /** + * The exception which can be thrown during reading next record. It holds until the next calling of next record. + */ + private IgniteCheckedException curErr; + + /** {@inheritDoc} */ + @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws IgniteCheckedException { + if (curErr != null) + throw curErr; + + IgniteBiTuple<WALPointer, WALRecord> ret = curRec; + + try { + advance(); + } + catch (IgniteCheckedException e) { + curErr = e; + } + + return ret; + } + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (curErr != null) + throw curErr; + + return curRec != null; + } + + /** + * Switches records iterator to the next record. <ul> <li>{@link #curRec} will be updated.</li> </ul> + * + * {@code advance()} runs a step ahead {@link #next()} + * + * @throws IgniteCheckedException If failed. + */ + protected abstract void advance() throws IgniteCheckedException; +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInput.java index 6384032e69f..6efa61014bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInput.java @@ -36,4 +36,14 @@ public interface ByteBufferBackedDataInput extends DataInput { * @throws IOException If failed. */ public void ensure(int requested) throws IOException; + + /** + * @return Position in the stream. + */ + public long position(); + + /** + * Size. + */ + public long size() throws IOException; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInputImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInputImpl.java index 2351ea7f5f3..a68adfb65e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInputImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferBackedDataInputImpl.java @@ -170,4 +170,18 @@ public class ByteBufferBackedDataInputImpl implements ByteBufferBackedDataInput @Override public String readUTF() throws IOException { throw new UnsupportedOperationException(); } + + /** + * {@inheritDoc} + */ + @Override public long position() { + return buf.position(); + } + + /** + * {@inheritDoc} + */ + @Override public long size() throws IOException { + return buf.limit(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java new file mode 100644 index 00000000000..2e11c868429 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIterator.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Optional; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteBiTuple; + +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.HEADER_RECORD; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; + +/** Byte Buffer WAL Iterator */ +public class ByteBufferWalIterator extends AbstractWalRecordsIteratorAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final RecordSerializer serializer; + + /** */ + private final ByteBufferBackedDataInputImpl dataInput; + + /** */ + private WALPointer expWalPtr; + + /** */ + public ByteBufferWalIterator( + GridCacheSharedContext<?, ?> cctx, + ByteBuffer byteBuf, + int ver, + WALPointer walPointer + ) throws IgniteCheckedException { + this(cctx, byteBuf, ver, walPointer, null); + } + + /** */ + public ByteBufferWalIterator( + GridCacheSharedContext<?, ?> cctx, + ByteBuffer byteBuf, + int ver, + WALPointer expWalPtr, + IgniteBiPredicate<WALRecord.RecordType, WALPointer> readTypeFilter + ) throws IgniteCheckedException { + serializer = new RecordSerializerFactoryImpl(cctx, readTypeFilter).createSerializer(ver); + + dataInput = new ByteBufferBackedDataInputImpl(); + + dataInput.buffer(byteBuf); + + this.expWalPtr = expWalPtr; + + advance(); + } + + /** */ + private IgniteBiTuple<WALPointer, WALRecord> advanceRecord() throws IgniteCheckedException { + if (!dataInput.buffer().hasRemaining()) + return null; + + IgniteBiTuple<WALPointer, WALRecord> result; + + try { + if (curRec == null) + skipHeader(); + + WALRecord rec = serializer.readRecord(dataInput, expWalPtr); + + result = new IgniteBiTuple<>(rec.position(), rec); + + expWalPtr = new WALPointer(expWalPtr.index(), expWalPtr.fileOffset() + rec.size(), 0); + } + catch (SegmentEofException e) { + return null; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + return result; + } + + /** */ + private void skipHeader() throws IOException { + int position = dataInput.buffer().position(); + + int type = dataInput.readUnsignedByte(); + + WALRecord.RecordType recType = WALRecord.RecordType.fromIndex(type - 1); + + if (recType == HEADER_RECORD) { + dataInput.buffer().position(position + HEADER_RECORD_SIZE); + + expWalPtr = new WALPointer(expWalPtr.index(), expWalPtr.fileOffset() + HEADER_RECORD_SIZE, 0); + } + else + dataInput.buffer().position(position); + } + + /** {@inheritDoc} */ + @Override protected void advance() throws IgniteCheckedException { + do + curRec = advanceRecord(); + while (curRec != null && curRec.get2().type() == null); + } + + /** {@inheritDoc} */ + @Override public Optional<WALPointer> lastRead() { + throw new UnsupportedOperationException(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/Crc32CheckingDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/Crc32CheckingDataInput.java new file mode 100644 index 00000000000..b55a461a2e3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/Crc32CheckingDataInput.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; +import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; + +/** + * Checking of CRC32. + */ +public class Crc32CheckingDataInput extends ByteBufferBackedDataInputImpl implements AutoCloseable { + /** */ + private final FastCrc crc = new FastCrc(); + + /** + * Last calc position. + */ + private int lastCalcPosition; + + /** + * Skip crc check. + */ + private final boolean skipCheck; + + /** */ + private final ByteBufferBackedDataInput delegate; + + /** */ + public Crc32CheckingDataInput(ByteBufferBackedDataInput delegate, boolean skipCheck) { + this.delegate = delegate; + + lastCalcPosition = buffer().position(); + + this.skipCheck = skipCheck; + } + + /** + * {@inheritDoc} + */ + @Override public void ensure(int requested) throws IOException { + int available = buffer().remaining(); + + if (available >= requested) + return; + + updateCrc(); + + delegate.ensure(requested); + + lastCalcPosition = 0; + } + + /** + * {@inheritDoc} + */ + @Override public void close() throws Exception { + updateCrc(); + + int val = crc.getValue(); + + int writtenCrc = readInt(); + + if ((val ^ writtenCrc) != 0 && !skipCheck) { + // If it is a last message we will skip it (EOF will be thrown). + ensure(5); + + throw new IgniteDataIntegrityViolationException( + "val: " + val + " writtenCrc: " + writtenCrc + ); + } + } + + /** */ + private void updateCrc() { + if (skipCheck) + return; + + int oldPos = buffer().position(); + + buffer().position(lastCalcPosition); + + crc.update(delegate.buffer(), oldPos - lastCalcPosition); + + lastCalcPosition = oldPos; + } + + /** {@inheritDoc} */ + @Override public int skipBytes(int n) throws IOException { + ensure(n); + + int skipped = Math.min(buffer().remaining(), n); + + buffer().position(buffer().position() + skipped); + + return skipped; + } + + /** + * {@inheritDoc} + */ + @Override public void readFully(byte[] b) throws IOException { + ensure(b.length); + + buffer().get(b); + } + + /** + * {@inheritDoc} + */ + @Override public void readFully(byte[] b, int off, int len) throws IOException { + ensure(len); + + buffer().get(b, off, len); + } + + /** + * {@inheritDoc} + */ + @Override public byte readByte() throws IOException { + ensure(1); + + return buffer().get(); + } + + /** + * {@inheritDoc} + */ + @Override public short readShort() throws IOException { + ensure(2); + + return buffer().getShort(); + } + + /** + * {@inheritDoc} + */ + @Override public char readChar() throws IOException { + ensure(2); + + return buffer().getChar(); + } + + /** + * {@inheritDoc} + */ + @Override public int readInt() throws IOException { + ensure(4); + + return buffer().getInt(); + } + + /** + * {@inheritDoc} + */ + @Override public long readLong() throws IOException { + ensure(8); + + return buffer().getLong(); + } + + /** + * {@inheritDoc} + */ + @Override public float readFloat() throws IOException { + ensure(4); + + return buffer().getFloat(); + } + + /** + * {@inheritDoc} + */ + @Override public double readDouble() throws IOException { + ensure(8); + + return buffer().getDouble(); + } + + /** {@inheritDoc} */ + @Override public ByteBuffer buffer() { + return delegate.buffer(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java index 2f34b575bbf..26fb01d766c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java @@ -32,7 +32,7 @@ import static java.nio.file.StandardOpenOption.READ; /** * WAL file descriptor. */ -public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRecordsIterator.AbstractFileDescriptor { +public class FileDescriptor implements Comparable<FileDescriptor>, AbstractFileWalRecordsIterator.AbstractFileDescriptor { /** file extension of WAL segment. */ private static final String WAL_SEGMENT_FILE_EXT = ".wal"; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 5470e2379cb..4e5a9358120 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -2857,7 +2857,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * */ - public static class ReadFileHandle extends AbstractFileHandle implements AbstractWalRecordsIterator.AbstractReadFileHandle { + public static class ReadFileHandle extends AbstractFileHandle implements AbstractFileWalRecordsIterator.AbstractReadFileHandle { /** Entry serializer. */ RecordSerializer ser; @@ -2923,7 +2923,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * Iterator over WAL-log. */ - private static class RecordsIterator extends AbstractWalRecordsIterator { + private static class RecordsIterator extends AbstractFileWalRecordsIterator { /** */ private static final long serialVersionUID = 0L; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java index fcb2d434115..6eaa8b36876 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java @@ -39,7 +39,7 @@ import org.jetbrains.annotations.Nullable; * Iterates over logical records of one WAL segment from archive. Used for WAL archive compression. * Doesn't deserialize actual record data, returns {@link MarshalledRecord} instances instead. */ -public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsIterator { +public class SingleSegmentLogicalRecordsIterator extends AbstractFileWalRecordsIterator { /** Serial version uid. */ private static final long serialVersionUID = 0L; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java index 07c49176729..33b5b7d2480 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java @@ -18,13 +18,9 @@ package org.apache.ignite.internal.processors.cache.persistence.wal.io; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; -import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; -import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; -import org.jetbrains.annotations.NotNull; /** * File input, backed by byte buffer file input. @@ -41,219 +37,8 @@ public interface FileInput extends ByteBufferBackedDataInput { */ void seek(long pos) throws IOException; - /** - * @return Position in the stream. - */ - long position(); - - /** - * @param skipCheck If CRC check should be skipped. - * @return autoclosable fileInput, after its closing crc32 will be calculated and compared with saved one - */ - SimpleFileInput.Crc32CheckingFileInput startRead(boolean skipCheck); - - /** - * Checking of CRC32. - */ - public class Crc32CheckingFileInput implements ByteBufferBackedDataInput, AutoCloseable { - /** */ - private final FastCrc crc = new FastCrc(); - - /** Last calc position. */ - private int lastCalcPosition; - - /** Skip crc check. */ - private boolean skipCheck; - - /** */ - private FileInput delegate; - - /** - */ - public Crc32CheckingFileInput(FileInput delegate, boolean skipCheck) { - this.delegate = delegate; - this.lastCalcPosition = delegate.buffer().position(); - this.skipCheck = skipCheck; - } - - /** {@inheritDoc} */ - @Override public void ensure(int requested) throws IOException { - int available = buffer().remaining(); - - if (available >= requested) - return; - - updateCrc(); - - delegate.ensure(requested); - - lastCalcPosition = 0; - } - - /** {@inheritDoc} */ - @Override public void close() throws Exception { - updateCrc(); - - int val = crc.getValue(); - - int writtenCrc = this.readInt(); - - if ((val ^ writtenCrc) != 0 && !skipCheck) { - // If it last message we will skip it (EOF will be thrown). - ensure(5); - - throw new IgniteDataIntegrityViolationException( - "val: " + val + " writtenCrc: " + writtenCrc - ); - } - } - - /** - * - */ - private void updateCrc() { - if (skipCheck) - return; - - int oldPos = buffer().position(); - - buffer().position(lastCalcPosition); - - crc.update(delegate.buffer(), oldPos - lastCalcPosition); - - lastCalcPosition = oldPos; - } - - /** {@inheritDoc} */ - @Override public int skipBytes(int n) throws IOException { - ensure(n); - - int skipped = Math.min(buffer().remaining(), n); - - buffer().position(buffer().position() + skipped); - - return skipped; - } - - /** - * {@inheritDoc} - */ - @Override public void readFully(@NotNull byte[] b) throws IOException { - ensure(b.length); - - buffer().get(b); - } - - /** - * {@inheritDoc} - */ - @Override public void readFully(@NotNull byte[] b, int off, int len) throws IOException { - ensure(len); - - buffer().get(b, off, len); - } - - /** - * {@inheritDoc} - */ - @Override public boolean readBoolean() throws IOException { - return readByte() == 1; - } - - /** - * {@inheritDoc} - */ - @Override public byte readByte() throws IOException { - ensure(1); - - return buffer().get(); - } - - /** - * {@inheritDoc} - */ - @Override public int readUnsignedByte() throws IOException { - return readByte() & 0xFF; - } - - /** - * {@inheritDoc} - */ - @Override public short readShort() throws IOException { - ensure(2); - - return buffer().getShort(); - } - - /** - * {@inheritDoc} - */ - @Override public int readUnsignedShort() throws IOException { - return readShort() & 0xFFFF; - } - - /** - * {@inheritDoc} - */ - @Override public char readChar() throws IOException { - ensure(2); - - return buffer().getChar(); - } - - /** - * {@inheritDoc} - */ - @Override public int readInt() throws IOException { - ensure(4); - - return buffer().getInt(); - } - - /** - * {@inheritDoc} - */ - @Override public long readLong() throws IOException { - ensure(8); - - return buffer().getLong(); - } - - /** - * {@inheritDoc} - */ - @Override public float readFloat() throws IOException { - ensure(4); - - return buffer().getFloat(); - } - - /** - * {@inheritDoc} - */ - @Override public double readDouble() throws IOException { - ensure(8); - - return buffer().getDouble(); - } - - /** - * {@inheritDoc} - */ - @Override public String readLine() throws IOException { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc} - */ - @Override public String readUTF() throws IOException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public ByteBuffer buffer() { - return delegate.buffer(); - } + /** {@inheritDoc} */ + @Override public default long size() throws IOException { + return io().size(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java index 57b858970d0..9c847aecb3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SimpleFileInput.java @@ -262,12 +262,4 @@ public class SimpleFileInput implements FileInput { @Override public String readUTF() throws IOException { throw new UnsupportedOperationException(); } - - /** - * @param skipCheck If CRC check should be skipped. - * @return autoclosable fileInput, after its closing crc will be calculated and compared with saved one - */ - @Override public Crc32CheckingFileInput startRead(boolean skipCheck) { - return new Crc32CheckingFileInput(this, skipCheck); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java index df6ca772291..ea49988285e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java @@ -40,7 +40,7 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; -import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator; +import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractFileWalRecordsIterator; import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.ReadFileHandle; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; @@ -70,7 +70,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.serial * WAL reader iterator, for creation in standalone WAL reader tool Operates over one directory, does not provide start * and end boundaries */ -class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator { +class StandaloneWalRecordsIterator extends AbstractFileWalRecordsIterator { /** Record buffer size */ public static final int DFLT_BUF_SIZE = 2 * 1024 * 1024; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java index 9227df1d495..d6396fe7d22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordSerializer.java @@ -21,8 +21,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; -import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; /** * Record serializer. @@ -54,7 +54,7 @@ public interface RecordSerializer { * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file * @return Read entry. */ - public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException; + public WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException; /** * Flag to write (or not) wal pointer to record diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java index 295e174abf8..39f86e9b4fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java @@ -31,14 +31,13 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; +import org.apache.ignite.internal.processors.cache.persistence.wal.Crc32CheckingDataInput; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; -import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; -import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleFileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -226,7 +225,7 @@ public class RecordV1Serializer implements RecordSerializer { } /** {@inheritDoc} */ - @Override public WALRecord readRecord(FileInput in0, WALPointer expPtr) throws IOException, IgniteCheckedException { + @Override public WALRecord readRecord(ByteBufferBackedDataInput in0, WALPointer expPtr) throws IOException, IgniteCheckedException { return readWithCrc(in0, expPtr, recordIO); } @@ -362,13 +361,13 @@ public class RecordV1Serializer implements RecordSerializer { * @throws IgniteCheckedException If it's unable to read record. */ static WALRecord readWithCrc( - FileInput in0, + ByteBufferBackedDataInput in0, WALPointer expPtr, RecordIO reader ) throws EOFException, IgniteCheckedException { long startPos = -1; - try (SimpleFileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) { + try (Crc32CheckingDataInput in = new Crc32CheckingDataInput(in0, skipCrc)) { startPos = in0.position(); WALRecord res = reader.readWithHeaders(in, expPtr); @@ -386,7 +385,7 @@ public class RecordV1Serializer implements RecordSerializer { long size = -1; try { - size = in0.io().size(); + size = in0.size(); } catch (IOException ignore) { // It just for information. Fail calculate file size. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java index 60c68c13f7c..e0c8ff6c81d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java @@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBac import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException; -import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.io.RecordIO; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.F; @@ -245,7 +244,7 @@ public class RecordV2Serializer implements RecordSerializer { } /** {@inheritDoc} */ - @Override public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException { + @Override public WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException { return RecordV1Serializer.readWithCrc(in, expPtr, recordIO); } diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 9909ea7f194..c360034206c 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1233,8 +1233,8 @@ org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTreeRuntimeExc org.apache.ignite.internal.processors.cache.persistence.tree.CorruptedTreeException org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO$EntryPart org.apache.ignite.internal.processors.cache.persistence.tree.reuse.LongListReuseBag -org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator -org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator$StartSeekingFilter +org.apache.ignite.internal.processors.cache.persistence.wal.AbstractFileWalRecordsIterator +org.apache.ignite.internal.processors.cache.persistence.wal.AbstractFileWalRecordsIterator$StartSeekingFilter org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager$1 org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager$FileCompressorWorker$1 org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager$RecordsIterator diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java index 5326f17093f..7e0666bf2d3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteDataIntegrityTests.java @@ -26,9 +26,9 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander; +import org.apache.ignite.internal.processors.cache.persistence.wal.Crc32CheckingDataInput; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; -import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleFileInput; import org.junit.After; import org.junit.Before; @@ -209,7 +209,7 @@ public class IgniteDataIntegrityTests { fileInput.io().position(0); for (int i = 0; i < 1024 / 16; i++) { - try (FileInput.Crc32CheckingFileInput in = fileInput.startRead(false)) { + try (Crc32CheckingDataInput in = new Crc32CheckingDataInput(fileInput, false)) { in.readInt(); in.readInt(); in.readInt(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIteratorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIteratorTest.java new file mode 100644 index 00000000000..cd6fd9f477d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/ByteBufferWalIteratorTest.java @@ -0,0 +1,639 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Random; +import java.util.concurrent.locks.LockSupport; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.RolloverType; +import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord; +import org.apache.ignite.internal.pagemem.wal.record.TimeStampRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.testframework.wal.record.RecordUtils; +import org.apache.ignite.testframework.wal.record.UnsupportedWalRecord; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +/** + * + */ +public class ByteBufferWalIteratorTest extends GridCommonAbstractTest { + /** Cache name. */ + private static final String CACHE_NAME = GridCommonAbstractTest.DEFAULT_CACHE_NAME; + + /** */ + private IgniteEx ig; + + /** */ + private GridCacheSharedContext<Object, Object> sharedCtx; + + /** */ + private GridCacheContext<Object, Object> cctx; + + /** */ + private RecordSerializer serializer; + + /** */ + private int idx; + + /** */ + private @Nullable IgniteInternalCache<Object, Object> cache; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + + idx = new Random().nextInt(); + + ig = startGrid(0); + + ig.cluster().state(ClusterState.ACTIVE); + + sharedCtx = ig.context().cache().context(); + + cache = sharedCtx.cache().cache(CACHE_NAME); + + cctx = cache.context(); + + RecordSerializerFactory serializerFactory = new RecordSerializerFactoryImpl(sharedCtx); + + serializer = serializerFactory.createSerializer(RecordSerializerFactory.LATEST_SERIALIZER_VERSION); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + + cleanPersistenceDir(); + } + + /** */ + private void writeRecord(ByteBuffer byteBuf, + WALRecord walRecord) throws IgniteCheckedException { + log.info("Writing " + walRecord.type()); + + int segment = idx; + + int fileOff = byteBuf.position(); + + int size = serializer.size(walRecord); + + walRecord.size(size); + + WALPointer walPointer = new WALPointer(segment, fileOff, size); + + walRecord.position(walPointer); + + serializer.writeRecord(walRecord, byteBuf); + } + + /** */ + private static boolean dataEntriesEqual(DataEntry x, DataEntry y) { + if (x == y) + return true; + + if (x == null || y == null) + return false; + + return x.cacheId() == y.cacheId() + && x.op() == y.op() + && Objects.equals(x.key(), y.key()); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME)); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + ) + ); + + return cfg; + } + + /** */ + @Test + public void testDataRecordsRead() throws Exception { + ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder()); + + final int cnt = 10; + + List<DataEntry> entries = generateEntries(cctx, cnt); + + for (int i = 0; i < entries.size(); i++) + writeRecord(byteBuf, new DataRecord(entries.get(i))); + + byteBuf.flip(); + + WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf, + RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0)); + + Iterator<DataEntry> dataEntriesIter = entries.iterator(); + + while (walIter.hasNext()) { + assertTrue(dataEntriesIter.hasNext()); + + WALRecord record = walIter.next().get2(); + + assertTrue(record instanceof DataRecord); + + DataEntry dataEntry = dataEntriesIter.next(); + + assertTrue(dataEntriesEqual( + ((DataRecord)record).get(0), + dataEntry)); + } + + assertFalse(dataEntriesIter.hasNext()); + } + + /** */ + @Test + public void testWalRecordsRead() throws Exception { + ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder()); + + List<WALRecord> records = Arrays.stream(WALRecord.RecordType.values()) + .filter(t -> t != WALRecord.RecordType.SWITCH_SEGMENT_RECORD) + .map(RecordUtils::buildWalRecord) + .filter(Objects::nonNull) + .filter(r -> !(r instanceof UnsupportedWalRecord)) + .collect(Collectors.toList()); + + for (WALRecord record : records) + writeRecord(byteBuf, record); + + byteBuf.flip(); + + WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf, + RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0)); + + Iterator<WALRecord> recordsIter = records.iterator(); + + while (walIter.hasNext()) { + assertTrue(recordsIter.hasNext()); + + WALRecord actualRec = walIter.next().get2(); + + WALRecord expectedRec = recordsIter.next(); + + assertTrue("Records of type " + expectedRec.type() + " are different:\n" + + "\tExpected:\t" + expectedRec + "\n" + + "\tActual :\t" + actualRec, + recordsEqual( + expectedRec, + actualRec)); + } + + assertFalse(recordsIter.hasNext()); + } + + /** */ + private boolean recordsEqual(WALRecord x, WALRecord y) { + if (x == y) + return true; + + if (x == null || y == null) + return false; + + log.info("Comparing " + x.type() + " and " + y.type()); + + return x.type() == y.type() + && Objects.equals(x.position(), y.position()) + && x.size() == y.size() + && (!(x instanceof TimeStampRecord) || ((TimeStampRecord)x).timestamp() == ((TimeStampRecord)y).timestamp()); + } + + /** */ + @Test + public void testReadFiltered() throws Exception { + ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder()); + + List<WALRecord> physicalRecords = Arrays.stream(WALRecord.RecordType.values()) + .filter(t -> t.purpose() == WALRecord.RecordPurpose.PHYSICAL) + .map(RecordUtils::buildWalRecord) + .filter(Objects::nonNull) + .filter(r -> !(r instanceof UnsupportedWalRecord)) + .collect(Collectors.toList()); + + final int cnt = physicalRecords.size(); + + List<DataEntry> entries = generateEntries(cctx, cnt); + + for (int i = 0; i < entries.size(); i++) { + writeRecord(byteBuf, new DataRecord(entries.get(i))); + + writeRecord(byteBuf, physicalRecords.get(i)); + } + + byteBuf.flip(); + + WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf, + RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0), + (t, p) -> t.purpose() == WALRecord.RecordPurpose.LOGICAL); + + Iterator<DataEntry> dataEntriesIter = entries.iterator(); + + while (walIter.hasNext()) { + assertTrue(dataEntriesIter.hasNext()); + + WALRecord record = walIter.next().get2(); + + assertTrue(record instanceof DataRecord); + + DataEntry dataEntry = dataEntriesIter.next(); + + assertTrue(dataEntriesEqual( + ((DataRecord)record).get(0), + dataEntry)); + } + + assertFalse(dataEntriesIter.hasNext()); + } + + /** */ + private List<DataEntry> generateEntries(GridCacheContext<Object, Object> cctx, int cnt) { + List<DataEntry> entries = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + GridCacheOperation op = i % 2 == 0 ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; + + KeyCacheObject key = cctx.toCacheKeyObject(i); + + CacheObject val = null; + + if (op != GridCacheOperation.DELETE) + val = cctx.toCacheObject("value-" + i); + + entries.add( + new DataEntry(cctx.cacheId(), key, val, op, null, cctx.cache().nextVersion(), + 0L, + cctx.affinity().partition(i), i, DataEntry.EMPTY_FLAGS)); + } + return entries; + } + + /** */ + @Test + public void testBrokenTail() throws Exception { + ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder()); + + List<DataEntry> entries = generateEntries(cctx, 3); + + for (int i = 0; i < 2; i++) + writeRecord(byteBuf, new DataRecord(entries.get(i))); + + int position1 = byteBuf.position(); + + writeRecord(byteBuf, new DataRecord(entries.get(2))); + + int position2 = byteBuf.position(); + + byteBuf.flip(); + + byteBuf.limit((position1 + position2) >> 1); + + WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf, + RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0)); + + assertTrue(walIter.hasNext()); + + walIter.next(); + + assertTrue(walIter.hasNext()); + + walIter.next(); + + try { + walIter.hasNext(); + + fail("hasNext() expected to fail"); + } + catch (IgniteException e) { + assertTrue(X.hasCause(e, IOException.class)); + } + } + + /** */ + @Test + public void testEmptyBuffer() throws Exception { + ByteBuffer byteBuf = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.nativeOrder()); + + byteBuf.flip(); + + WALIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf, + RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, 0, 0)); + + assertFalse(walIter.hasNext()); + + try { + walIter.next(); + + fail("next() expected to fail"); + } + catch (NoSuchElementException ignored) { + // This is expected. + } + } + + /** */ + @Test + public void testWalSegmentReadFromDisk() throws Exception { + FileDescriptor[] archiveFiles = generateWalFiles(20, 10_000); + + for (int i = 0; i < archiveFiles.length; i++) + checkIteratorFromDisk(archiveFiles[i]); + } + + /** */ + private void checkIteratorFromDisk(FileDescriptor fd) throws IOException, IgniteCheckedException { + log.info("Checking " + fd.file()); + + ByteBuffer byteBuf = loadFile(fd); + + checkByteBuffer(byteBuf, false, true, (int)fd.idx(), 0); + } + + /** */ + private void checkByteBuffer(ByteBuffer byteBuf, boolean adaptTest, boolean hasHdr, int idx, int pos) throws IgniteCheckedException { + log.info("Bytes count " + byteBuf.limit()); + + int p0 = hasHdr ? 29 : 0; + + int shift = adaptTest ? -1 : 0; + + ByteBufferWalIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf, + RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer(idx, pos, 0)); + + Map<WALRecord.RecordType, Integer> counts = new EnumMap<>(WALRecord.RecordType.class); + + while (walIter.hasNext()) { + int p1 = byteBuf.position(); + + IgniteBiTuple<WALPointer, WALRecord> next = walIter.next(); + + if (log.isDebugEnabled()) + log.debug("Got " + next.get2().type() + " at " + next.get1()); + + if (shift >= 0) + assertEquals("WalPointer offset check failed", p0 + shift, next.get1().fileOffset()); + else + shift = next.get1().fileOffset() - p0; + + assertEquals("WalPointer length check failed", p1 - p0, next.get1().length()); + + assertEquals("WalPointers comparison failed", next.get1(), next.get2().position()); + + assertEquals("WalPointers length comparison failed", next.get1().length(), next.get2().position().length()); + + p0 = p1; + + counts.merge(next.get2().type(), 1, Integer::sum); + + assertTrue(next != null); + } + + assertFalse("ByteBuffer has some unprocessed bytes", byteBuf.hasRemaining()); + + printStats(counts); + } + + /** */ + private void printStats(Map<WALRecord.RecordType, Integer> counts) { + if (counts.isEmpty()) { + log.info("No record"); + return; + } + + ArrayList<WALRecord.RecordType> types = new ArrayList<>(counts.keySet()); + + types.sort((x, y) -> -counts.get(x).compareTo(counts.get(y))); + + int len = types.stream().map(x -> x.toString().length()).max(Integer::compare).orElse(0); + + char[] spaces = new char[len]; + + Arrays.fill(spaces, ' '); + + StringBuilder sb = new StringBuilder("Statistics:"); + + types.forEach(x -> sb.append("\n\t") + .append(x) + .append(spaces, 0, len - x.toString().length()) + .append("\t") + .append(counts.get(x))); + + log.info(sb.toString()); + } + + /** */ + private ByteBuffer loadFile(FileDescriptor fd) throws IOException { + File file = fd.file(); + + int size = (int)file.length(); + + FileInputStream fileInputStream = new FileInputStream(file); + + final byte[] bytes = new byte[size]; + + int length = fileInputStream.read(bytes); + + assertTrue(length == size); + + ByteBuffer byteBuf = ByteBuffer.wrap(bytes); + + byteBuf.order(ByteOrder.nativeOrder()); + + return byteBuf; + } + + /** */ + private FileDescriptor[] generateWalFiles(int files, int size) throws IgniteCheckedException { + Random random = new Random(); + + IgniteCacheDatabaseSharedManager sharedMgr = ig.context().cache().context().database(); + + IgniteWriteAheadLogManager walMgr = ig.context().cache().context().wal(); + + for (int fileNo = 0; fileNo < files; fileNo++) { + for (int i = 0; i < size; i++) { + switch (random.nextInt(2)) { + case 0: + cache.put(random.nextInt(100), "Cache value " + random.nextInt()); + break; + case 1: + cache.remove(random.nextInt(100)); + break; + } + } + + sharedMgr.checkpointReadLock(); + + try { + walMgr.log(new SnapshotRecord(fileNo, false), RolloverType.NEXT_SEGMENT); + } + finally { + sharedMgr.checkpointReadUnlock(); + } + } + + while (true) { + FileDescriptor[] archiveFiles = ((FileWriteAheadLogManager)walMgr).walArchiveFiles(); + + if (archiveFiles.length >= files) + return archiveFiles; + + LockSupport.parkNanos(10_000_000); + } + } + + /** */ + @Test + public void testPartialWalSegmentReadFromDisk() throws Exception { + FileDescriptor[] archiveFiles = generateWalFiles(1, 100); + + for (int i = 0; i < archiveFiles.length; i++) + checkPartialIteratorFromDisk(archiveFiles[i]); + } + + /** */ + private void checkPartialIteratorFromDisk(FileDescriptor fd) throws IOException, IgniteCheckedException { + log.info("Checking " + fd.file()); + + ByteBuffer byteBuf = loadFile(fd); + + log.info("Bytes count " + byteBuf.limit()); + + List<Integer> positions = new ArrayList<>(); + + positions.add(byteBuf.position()); + + ByteBufferWalIterator walIter = new ByteBufferWalIterator(sharedCtx, byteBuf, + RecordSerializerFactory.LATEST_SERIALIZER_VERSION, new WALPointer((int)fd.idx(), 0, 0)); + + positions.add(byteBuf.position()); + + positions.addAll( + StreamSupport.stream(walIter.spliterator(), false) + .map(x -> byteBuf.position()) + .collect(Collectors.toList())); + + Random random = new Random(); + + int size = positions.size(); + + assertTrue("Size shouild be at least 10 for this test", size >= 10); + + int n1 = (int)((0.1 + 0.4 * random.nextDouble()) * size); + + int n2 = (int)((0.5 + 0.4 * random.nextDouble()) * size); + + // With header. + checkByteBufferPart(byteBuf, positions, 0, n1, true, (int)fd.idx()); + + // Middle part. + checkByteBufferPart(byteBuf, positions, n1, n2, false, (int)fd.idx()); + + // Empty buffer. + checkByteBufferPart(byteBuf, positions, n2, n2, false, (int)fd.idx()); + + // With tail. + checkByteBufferPart(byteBuf, positions, n2, size - 1, false, (int)fd.idx()); + } + + /** */ + private void checkByteBufferPart(ByteBuffer byteBuf, List<Integer> positions, int fromRec, int toRec, + boolean hasHdr, int idx) + throws IgniteCheckedException { + int fromPos = positions.get(fromRec); + + int toPos = positions.get(toRec); + + log.info(("Checking ByteBuffer from " + fromRec + "(" + fromPos + ") to " + toRec + "(" + toPos + ")")); + + int len = toPos - fromPos; + + byteBuf.position(fromPos).limit(toPos); + + byte[] arr = byteBuf.array(); + + byteBuf = ByteBuffer.allocate(len).order(ByteOrder.nativeOrder()); + + System.arraycopy(arr, fromPos, byteBuf.array(), 0, len); + + int pos = 0; + + if (byteBuf.limit() > 12) { + byteBuf.position(9); + + pos = byteBuf.getInt(); + + byteBuf.position(0); + } + + checkByteBuffer(byteBuf, true, hasHdr, idx, pos); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java index c79381ad237..2ba0c57ebb3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.UsedPages import org.apache.ignite.internal.processors.cache.persistence.pagemem.UsedPagesMetricTestPersistence; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIOFreeSizeTest; import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIOTest; +import org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferWalIteratorTest; import org.apache.ignite.internal.processors.cache.persistence.wal.CpTriggeredWalDeltaConsistencyTest; import org.apache.ignite.internal.processors.cache.persistence.wal.ExplicitWalDeltaConsistencyTest; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManagerSelfTest; @@ -124,6 +125,8 @@ public class IgnitePdsTestSuite5 { GridTestUtils.addTestIfNeeded(suite, WalCompactionNotificationsTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, ByteBufferWalIteratorTest.class, ignoredTests); + return suite; } }