This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch IGNITE-17700__realtime_cdc in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/IGNITE-17700__realtime_cdc by this push: new 1c2cf8dd0a4 IGNITE-19622 Add realtime CDC buffer (#10778) 1c2cf8dd0a4 is described below commit 1c2cf8dd0a426e88d9b70cc7d15db71f9edd694c Author: Maksim Timonin <timoninma...@apache.org> AuthorDate: Wed Jul 12 13:47:01 2023 +0300 IGNITE-19622 Add realtime CDC buffer (#10778) --- .../org/apache/ignite/IgniteSystemProperties.java | 8 + .../configuration/DataStorageConfiguration.java | 55 +++++ .../cache/persistence/cdc/CdcBuffer.java | 164 +++++++++++++ .../cache/persistence/cdc/CdcBufferConsumer.java | 33 +++ .../cache/persistence/cdc/CdcProcessor.java | 81 +++++++ .../cache/persistence/cdc/CdcWorker.java | 93 ++++++++ .../persistence/wal/FileWriteAheadLogManager.java | 19 +- .../wal/filehandle/FileHandleManagerFactory.java | 6 + .../wal/filehandle/FileHandleManagerImpl.java | 21 +- .../wal/filehandle/FileWriteHandleImpl.java | 17 +- .../wal/filehandle/FsyncFileHandleManagerImpl.java | 11 +- .../wal/filehandle/FsyncFileWriteHandle.java | 16 +- .../cdc/RealtimeCdcBufferDedicatedWorkerTest.java | 42 ++++ .../persistence/cdc/RealtimeCdcBufferSelfTest.java | 113 +++++++++ .../persistence/cdc/RealtimeCdcBufferTest.java | 259 +++++++++++++++++++++ .../cdc/RealtimeCdcBufferWalMmapDisabledTest.java | 27 +++ .../ignite/testsuites/IgnitePdsTestSuite2.java | 10 + 17 files changed, 966 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 5fb101aebce..eaabed6607a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -93,6 +93,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo import static org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.DFLT_MVCC_TX_SIZE_CACHING_THRESHOLD; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_PDS_WAL_REBALANCE_THRESHOLD; +import static org.apache.ignite.internal.processors.cache.persistence.cdc.CdcWorker.DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT; import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory.DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE; import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage.DFLT_IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD; import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointWorkflow.DFLT_CHECKPOINT_PARALLEL_SORT_THRESHOLD; @@ -1531,6 +1532,13 @@ public final class IgniteSystemProperties { type = Long.class, defaults = "" + DFLT_UNWIND_THROTTLING_TIMEOUT) public static final String IGNITE_UNWIND_THROTTLING_TIMEOUT = "IGNITE_UNWIND_THROTTLING_TIMEOUT"; + /** + * Throttling timeout in millis for polling CDC buffer in realtime mode. Default is 100 ms. + */ + @SystemProperty(value = "Throttling timeout in millis for polling CDC buffer in realtime mode", type = Long.class, + defaults = "" + DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT) + public static final String IGNITE_THROTTLE_POLL_CDC_BUF = "IGNITE_THROTTLE_POLL_CDC_BUF"; + /** * Threshold for throttling operations logging. */ diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java index 8daa97a1c7b..86ed3c4e3ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java @@ -20,6 +20,7 @@ package org.apache.ignite.configuration; import java.io.Serializable; import java.util.zip.Deflater; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcBufferConsumer; import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; @@ -252,6 +253,14 @@ public class DataStorageConfiguration implements Serializable { @IgniteExperimental private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE; + /** Maximum size of CDC buffer. */ + @IgniteExperimental + private long maxCdcBufSize = (long)DFLT_WAL_SEGMENTS * DFLT_WAL_SEGMENT_SIZE; + + /** CDC buffer consumer. */ + @IgniteExperimental + private transient CdcBufferConsumer cdcConsumer; + /** * Metrics enabled flag. * @deprecated Will be removed in upcoming releases. @@ -1355,6 +1364,52 @@ public class DataStorageConfiguration implements Serializable { return defragmentationThreadPoolSize; } + /** + * Gets a max size(in bytes) of CDC buffer. + * + * @return max size(in bytes) of CDC buffer. + */ + public long getMaxCdcBufferSize() { + return maxCdcBufSize; + } + + /** + * Sets a max allowed size(in bytes) of CDC buffer. Set to {@code 0} to disable realtime CDC mode. + * + * @param maxCdcBufSize max size(in bytes) of CDC buffer. + * @return {@code this} for chaining. + */ + public DataStorageConfiguration setMaxCdcBufferSize(long maxCdcBufSize) { + A.ensure( + maxCdcBufSize >= 0, + "maxCdcBufferSize must be greater than 0. To disable realtime mode of CDC set value to 0."); + + this.maxCdcBufSize = maxCdcBufSize; + + return this; + } + + /** + * Gets CDC raw data consumer. + * + * @return CDC raw data consumer. + */ + public CdcBufferConsumer getCdcConsumer() { + return cdcConsumer; + } + + /** + * Sets CDC raw data consumer. + * + * @param cdcConsumer CDC raw data consumer. + * @return {@code this} for chaining. + */ + public DataStorageConfiguration setCdcConsumer(CdcBufferConsumer cdcConsumer) { + this.cdcConsumer = cdcConsumer; + + return this; + } + /** * Gets a min allowed size(in bytes) of WAL archives. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java new file mode 100644 index 00000000000..1b105bac5db --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.cdc; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; +import org.jetbrains.annotations.Nullable; + +/** + * Buffer that stores WAL records before {@link CdcWorker} consumes it. Buffer is a single-producer single-consumer bounded queue. + * <p> + * TODO: Optimize the queue: + * 1. by space using LinkedArrayQueue. Example: http://psy-lob-saw.blogspot.com/2016/10/linked-array-queues-part-1-spsc.html. + * It helps to avoid using AtomicLong for #size. + * 2. by performance using AtomicReference#lazySet or similar for LinkedNode#next. + */ +public class CdcBuffer { + /** Maximum size of the underlying buffer, bytes. */ + private final long maxSize; + + /** Reference to last consumed linked node. */ + private LinkedNode consumerNode; + + /** Reference to last produced linked node. */ + private volatile LinkedNode producerNode; + + /** Current size of the buffer, bytes. */ + private final AtomicLong size = new AtomicLong(); + + /** If {@code true} then buffer has overflowed. */ + private volatile boolean overflowed; + + /** */ + public CdcBuffer(long maxSize) { + assert maxSize > 0 : maxSize; + + this.maxSize = maxSize; + + producerNode = consumerNode = new LinkedNode(null); + } + + /** + * Offers data for the queue. Performs by the single producer thread. + * + * @param data Data to store in the buffer. + */ + public boolean offer(ByteBuffer data) { + int bufSize = data.limit() - data.position(); + + if (size.addAndGet(bufSize) > maxSize) { + overflowed = true; + + return false; + } + + byte[] cp = new byte[bufSize]; + + data.get(cp, 0, bufSize); + + LinkedNode newNode = new LinkedNode(ByteBuffer.wrap(cp)); + LinkedNode oldNode = producerNode; + + producerNode = newNode; + oldNode.next(newNode); + + return true; + } + + /** + * Polls data from the queue. Performs by single consumer thread. + * + * @return Polled data, or {@code null} if no data is available now. + */ + public ByteBuffer poll() { + LinkedNode prev = consumerNode; + + LinkedNode next = prev.next; + + if (next != null) + return poll(prev, next); + else if (prev != producerNode) { + while ((next = prev.next) == null) { + // No-op. New reference should appear soon. + } + + return poll(prev, next); + } + + return null; + } + + /** @return Current buffer size. */ + public long size() { + return size.get(); + } + + /** Cleans the buffer if overflowed. Performs by the consumer thread. */ + public void cleanIfOverflowed() { + if (!overflowed || consumerNode == null) + return; + + ByteBuffer data; + + do { + data = poll(); + } + while (data != null); + + consumerNode = null; + producerNode = null; + + size.set(0); + } + + /** + * @param prev Previously consumed linked node. + * @param next Node to consume. + * @return Data to consume. + */ + private ByteBuffer poll(LinkedNode prev, LinkedNode next) { + ByteBuffer data = next.data; + + prev.next = null; + consumerNode = next; + + size.addAndGet(-(data.limit() - data.position())); + + return data; + } + + /** */ + private static class LinkedNode { + /** */ + private volatile @Nullable LinkedNode next; + + /** */ + private final ByteBuffer data; + + /** */ + LinkedNode(ByteBuffer data) { + this.data = data; + } + + /** */ + void next(LinkedNode next) { + this.next = next; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java new file mode 100644 index 00000000000..67cd422fb0f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.cdc; + +import java.nio.ByteBuffer; + +/** Mock for Realtime CDC buffer consumer. */ +public interface CdcBufferConsumer { + /** + * Consumes raw WAL data. + * + * @param data Raw data to consume. + */ + public void consume(ByteBuffer data); + + /** */ + public void close(); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java new file mode 100644 index 00000000000..92adcf2acaf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.cdc; + +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** CDC processor responsible for collecting data changes in realtime within Ignite node. */ +public class CdcProcessor { + /** Buffer to store collected data. */ + private final CdcBuffer cdcBuf; + + /** CDC worker. */ + private final CdcWorker worker; + + /** Ignite log. */ + private final IgniteLogger log; + + /** Whether CDC is enabled. Disables after {@link #cdcBuf} overflows. */ + private boolean enabled = true; + + /** */ + public CdcProcessor(GridCacheSharedContext<?, ?> cctx, IgniteLogger log) { + this.log = log; + + DataStorageConfiguration dsCfg = cctx.gridConfig().getDataStorageConfiguration(); + + cdcBuf = new CdcBuffer(dsCfg.getMaxCdcBufferSize()); + worker = new CdcWorker(cctx, log, cdcBuf, dsCfg.getCdcConsumer()); + } + + /** + * @param dataBuf Buffer that contains data to collect. + */ + public void collect(ByteBuffer dataBuf) { + if (!enabled) + return; + + if (log.isDebugEnabled()) + log.debug("Offerring a data bucket to the CDC buffer [len=" + (dataBuf.limit() - dataBuf.position()) + ']'); + + if (!cdcBuf.offer(dataBuf)) { + enabled = false; + + log.warning("CDC buffer has overflowed. Stop realtime mode of CDC."); + + worker.cancel(); + } + } + + /** Start CDC worker. */ + public void start() { + worker.restart(); + } + + /** Shutdown CDC worker. */ + public void shutdown() throws IgniteInterruptedCheckedException { + worker.cancel(); + + U.join(worker); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java new file mode 100644 index 00000000000..27655413c29 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.cdc; + +import java.nio.ByteBuffer; +import java.util.concurrent.locks.LockSupport; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.thread.IgniteThread; + +/** */ +public class CdcWorker extends GridWorker { + /** Default throttling timeout in millis for polling CDC buffer. */ + public static final int DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT = 100; + + /** Throttling timeout in millis for polling CDC buffer. */ + private final long cdcBufPollTimeout = Long.getLong( + IgniteSystemProperties.IGNITE_THROTTLE_POLL_CDC_BUF, DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT); + + /** CDC buffer. */ + private final CdcBuffer cdcBuf; + + /** CDC consumer. */ + private final CdcBufferConsumer consumer; + + /** */ + public CdcWorker( + GridCacheSharedContext<?, ?> cctx, + IgniteLogger log, + CdcBuffer cdcBuf, + CdcBufferConsumer consumer + ) { + super(cctx.igniteInstanceName(), + "cdc-worker%" + cctx.igniteInstanceName(), + log, + cctx.kernalContext().workersRegistry()); + + this.cdcBuf = cdcBuf; + this.consumer = consumer; + } + + /** */ + @Override public void body() { + while (!isCancelled()) { + updateHeartbeat(); + + ByteBuffer data = cdcBuf.poll(); + + if (data == null) { + LockSupport.parkNanos(cdcBufPollTimeout * 1_000_000); // millis to nanos. + + continue; + } + + if (log.isDebugEnabled()) + log.debug("Poll a data bucket from CDC buffer [len=" + (data.limit() - data.position()) + ']'); + + // TODO: Consumer must not block this system thread. + consumer.consume(data); + } + + consumer.close(); + } + + /** {@inheritDoc} */ + @Override protected void cleanup() { + cdcBuf.cleanIfOverflowed(); + } + + /** */ + public void restart() { + isCancelled.set(false); + + new IgniteThread(this).start(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index f68b366fbb6..5470e2379cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -87,6 +87,7 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; @@ -313,6 +314,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> CURR_HND_UPD = AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currHnd"); + /** CDC processor, {@code null} if CDC is disabled. */ + @Nullable private CdcProcessor cdcProc; + /** * File archiver moves segments from work directory to archive. Locked segments may be kept not moved until release. * For mode archive and work folders set to equal value, archiver is not created. @@ -521,6 +525,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl dispatcher.registerProperty(cdcDisabled); }); + + if (dsCfg.getMaxCdcBufferSize() > 0) + cdcProc = new CdcProcessor(cctx, log); } serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVer); @@ -581,7 +588,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg); fileHandleManager = fileHandleManagerFactory.build( - cctx, metrics, mmap, serializer, this::currentHandle + cctx, metrics, mmap, serializer, cdcProc, this::currentHandle ); lockedSegmentFileInputFactory = new LockedSegmentFileInputFactory( @@ -753,6 +760,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (cleaner != null) cleaner.shutdown(); + + // TODO: stop cdcProcessor after checkpoint (reason=node stop) finished. + if (cdcProc != null) + cdcProc.shutdown(); } catch (IgniteInterruptedCheckedException e) { U.error(log, "Failed to gracefully shutdown WAL components, thread was interrupted.", e); @@ -764,7 +775,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (log.isDebugEnabled()) log.debug("Activated file write ahead log manager [nodeId=" + cctx.localNodeId() + " topVer=" + cctx.discovery().topologyVersionEx() + " ]"); - //NOOP implementation, we need to override it. + + // TODO: Invoke here to avoid double initialization. CdcConsumer must consume data only after full start. + // Will fix in https://issues.apache.org/jira/browse/IGNITE-19637 + if (cdcProc != null) + cdcProc.start(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java index 8850c2003b6..619ff63f2d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java @@ -23,7 +23,9 @@ import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; +import org.jetbrains.annotations.Nullable; /** * Factory of {@link FileHandleManager}. @@ -48,6 +50,7 @@ public class FileHandleManagerFactory { * @param metrics Data storage metrics. * @param mmap Using mmap. * @param serializer Serializer. + * @param cdcProc CDC processor. * @param currHandleSupplier Supplier of current handle. * @return One of implementation of {@link FileHandleManager}. */ @@ -56,6 +59,7 @@ public class FileHandleManagerFactory { DataStorageMetricsImpl metrics, boolean mmap, RecordSerializer serializer, + @Nullable CdcProcessor cdcProc, Supplier<FileWriteHandle> currHandleSupplier ) { if (dsConf.getWalMode() == WALMode.FSYNC && !walFsyncWithDedicatedWorker) @@ -63,6 +67,7 @@ public class FileHandleManagerFactory { cctx, metrics, serializer, + cdcProc, currHandleSupplier, dsConf.getWalMode(), dsConf.getWalSegmentSize(), @@ -75,6 +80,7 @@ public class FileHandleManagerFactory { metrics, mmap, serializer, + cdcProc, currHandleSupplier, dsConf.getWalMode(), dsConf.getWalBufferSize(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java index 6b08531a233..e0a779fa61a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; @@ -41,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.Nullable; import static java.lang.Long.MAX_VALUE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT; @@ -63,6 +65,9 @@ public class FileHandleManagerImpl implements FileHandleManager { /** Wal segment sync worker. */ private final WalSegmentSyncer walSegmentSyncWorker; + /** CDC processor, {@code null} if CDC is disabled. */ + private final @Nullable CdcProcessor cdcProc; + /** Context. */ protected final GridCacheSharedContext cctx; @@ -109,6 +114,7 @@ public class FileHandleManagerImpl implements FileHandleManager { DataStorageMetricsImpl metrics, boolean mmap, RecordSerializer serializer, + @Nullable CdcProcessor cdcProc, Supplier<FileWriteHandle> currentHandleSupplier, WALMode mode, int walBufferSize, @@ -126,6 +132,7 @@ public class FileHandleManagerImpl implements FileHandleManager { this.maxWalSegmentSize = maxWalSegmentSize; this.fsyncDelay = fsyncDelay; walWriter = new WALWriter(log); + this.cdcProc = cdcProc; if (mode != WALMode.NONE && mode != WALMode.FSYNC) { walSegmentSyncWorker = new WalSegmentSyncer( @@ -163,7 +170,7 @@ public class FileHandleManagerImpl implements FileHandleManager { rbuf.init(position); return new FileWriteHandleImpl( - cctx, fileIO, rbuf, serializer, metrics, walWriter, position, + cctx, fileIO, rbuf, serializer, metrics, walWriter, cdcProc, position, mode, mmap, true, fsyncDelay, maxWalSegmentSize ); } @@ -181,7 +188,7 @@ public class FileHandleManagerImpl implements FileHandleManager { rbuf = currentHandle().buf.reset(); return new FileWriteHandleImpl( - cctx, fileIO, rbuf, serializer, metrics, walWriter, 0, + cctx, fileIO, rbuf, serializer, metrics, walWriter, cdcProc, 0, mode, mmap, false, fsyncDelay, maxWalSegmentSize ); } @@ -376,7 +383,17 @@ public class FileHandleManagerImpl implements FileHandleManager { updateHeartbeat(); try { + int bufPos = seg.buffer().position(); + writeBuffer(seg.position(), seg.buffer()); + + if (cdcProc != null) { + ByteBuffer cdcBuf = seg.buffer().duplicate(); + cdcBuf.position(bufPos); + cdcBuf.limit(seg.buffer().limit()); + + cdcProc.collect(cdcBuf); + } } catch (Throwable e) { log.error("Exception in WAL writer thread:", e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java index 5dc2ec155f6..a1fe746eeb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; @@ -114,6 +115,9 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle /** Buffer. */ protected final SegmentedRingByteBuffer buf; + /** Cdc Buffer, {@code null} if CDC is disabled. */ + private final @Nullable CdcProcessor cdcProc; + /** */ private final WALMode mode; @@ -157,8 +161,8 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle */ FileWriteHandleImpl( GridCacheSharedContext cctx, SegmentIO fileIO, SegmentedRingByteBuffer rbuf, RecordSerializer serializer, - DataStorageMetricsImpl metrics, FileHandleManagerImpl.WALWriter writer, long pos, WALMode mode, boolean mmap, - boolean resume, long fsyncDelay, long maxWalSegmentSize) throws IOException { + DataStorageMetricsImpl metrics, FileHandleManagerImpl.WALWriter writer, CdcProcessor cdcProc, + long pos, WALMode mode, boolean mmap, boolean resume, long fsyncDelay, long maxWalSegmentSize) throws IOException { super(fileIO); assert serializer != null; @@ -170,6 +174,7 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle this.log = cctx.logger(FileWriteHandleImpl.class); this.cctx = cctx; this.walWriter = writer; + this.cdcProc = cdcProc; this.serializer = serializer; this.written = pos; this.lastFsyncPos = pos; @@ -413,6 +418,14 @@ class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle fsync((MappedByteBuffer)buf.buf, off, len); + if (cdcProc != null) { + ByteBuffer cdcBuf = buf.buf.duplicate(); + cdcBuf.position(off); + cdcBuf.limit(off + len); + + cdcProc.collect(cdcBuf); + } + seg.release(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java index af3957c128b..8b398b3fc54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java @@ -26,9 +26,11 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; +import org.jetbrains.annotations.Nullable; /** * Implementation of {@link FileWriteHandle} for FSYNC mode. @@ -49,6 +51,9 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager { /** */ protected final RecordSerializer serializer; + /** */ + private final @Nullable CdcProcessor cdcProc; + /** Current handle supplier. */ private final Supplier<FileWriteHandle> currentHandleSupplier; @@ -75,6 +80,7 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager { GridCacheSharedContext cctx, DataStorageMetricsImpl metrics, RecordSerializer serializer, + @Nullable CdcProcessor cdcProc, Supplier<FileWriteHandle> handle, WALMode mode, long maxWalSegmentSize, @@ -86,6 +92,7 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager { this.mode = mode; this.metrics = metrics; this.serializer = serializer; + this.cdcProc = cdcProc; currentHandleSupplier = handle; this.maxWalSegmentSize = maxWalSegmentSize; this.fsyncDelay = fsyncDelay; @@ -96,7 +103,7 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager { @Override public FileWriteHandle initHandle(SegmentIO fileIO, long position, RecordSerializer serializer) throws IOException { return new FsyncFileWriteHandle( - cctx, fileIO, metrics, serializer, position, + cctx, fileIO, metrics, serializer, cdcProc, position, mode, maxWalSegmentSize, tlbSize, fsyncDelay ); } @@ -105,7 +112,7 @@ public class FsyncFileHandleManagerImpl implements FileHandleManager { @Override public FileWriteHandle nextHandle(SegmentIO fileIO, RecordSerializer serializer) throws IOException { return new FsyncFileWriteHandle( - cctx, fileIO, metrics, serializer, 0, + cctx, fileIO, metrics, serializer, cdcProc, 0, mode, maxWalSegmentSize, tlbSize, fsyncDelay ); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java index ca06af53b8b..59178e497ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; @@ -114,6 +115,9 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle /** Persistence metrics tracker. */ private final DataStorageMetricsImpl metrics; + /** */ + private final @Nullable CdcProcessor cdcProc; + /** Logger. */ protected final IgniteLogger log; @@ -152,7 +156,7 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle */ FsyncFileWriteHandle( GridCacheSharedContext cctx, SegmentIO fileIO, - DataStorageMetricsImpl metrics, RecordSerializer serializer, long pos, + DataStorageMetricsImpl metrics, RecordSerializer serializer, CdcProcessor cdcProc, long pos, WALMode mode, long maxSegmentSize, int size, long fsyncDelay) throws IOException { super(fileIO); assert serializer != null; @@ -165,6 +169,7 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle this.fsyncDelay = fsyncDelay; this.maxSegmentSize = maxSegmentSize; this.serializer = serializer; + this.cdcProc = cdcProc; this.written = pos; this.lastFsyncPos = pos; @@ -780,6 +785,7 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle // Do the write. int size = buf.remaining(); + int bufPos = buf.position(); assert size > 0 : size; @@ -799,6 +805,14 @@ class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle throw se; } + + if (cdcProc != null) { + ByteBuffer cdcBuf = buf.duplicate(); + cdcBuf.position(bufPos); + cdcBuf.limit(buf.limit()); + + cdcProc.collect(cdcBuf); + } } finally { writeComplete.signalAll(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferDedicatedWorkerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferDedicatedWorkerTest.java new file mode 100644 index 00000000000..3f48c8f6145 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferDedicatedWorkerTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.cdc; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP; + +/** */ +@WithSystemProperty(key = IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER, value = "true") +@WithSystemProperty(key = IGNITE_WAL_MMAP, value = "false") +public class RealtimeCdcBufferDedicatedWorkerTest extends RealtimeCdcBufferTest { + /** Override params to test only FSYNC mode. */ + @Parameterized.Parameters(name = "walMode={0}") + public static List<Object[]> params() { + List<Object[]> param = new ArrayList<>(); + + param.add(new Object[] {WALMode.FSYNC}); + + return param; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferSelfTest.java new file mode 100644 index 00000000000..a16a256da4b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferSelfTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.cdc; + +import java.nio.ByteBuffer; +import java.util.concurrent.locks.LockSupport; +import org.junit.Test; + +/** */ +public class RealtimeCdcBufferSelfTest { + /** */ + @Test + public void testDisableBuffer() { + CdcBuffer buf = new CdcBuffer(10); + + // Fill the buffer. + for (int i = 0; i < 10; i++) { + boolean res = buf.offer(build()); + + assert res; + assert buf.size() == i + 1; + } + + boolean res = buf.offer(build()); + + assert !res; + } + + /** */ + @Test + public void testConsumeEmptyBuffer() { + CdcBuffer buf = new CdcBuffer(10); + + for (int i = 0; i < 10; i++) { + ByteBuffer data = buf.poll(); + + assert data == null; + assert buf.size() == 0; + } + } + + /** */ + @Test + public void testConsumeFullBuffer() { + CdcBuffer buf = new CdcBuffer(10); + + for (int i = 0; i < 10; i++) + buf.offer(build()); + + for (int i = 0; i < 10; i++) { + ByteBuffer data = buf.poll(); + + assert build().equals(data); + assert buf.size() == 10 - (i + 1); + } + + boolean res = buf.offer(build()); + + assert res; + assert buf.size() == 1; + } + + /** */ + @Test + public void testConcurrentFillBuffer() throws Exception { + int size = 1_000_000; + + CdcBuffer buf = new CdcBuffer(size); + + Thread th1 = new Thread(() -> { + for (int i = 0; i < size; i++) { + buf.offer(build()); + + LockSupport.parkNanos(1_000); + } + }); + + Thread th2 = new Thread(() -> { + // Goal is to invoke `poll` more frequently than `offer`, to check `poll` waits offered value. + for (int i = 0; i < size * 1_000; i++) + buf.poll(); + }); + + th1.start(); + th2.start(); + + th1.join(); + th2.join(); + } + + /** */ + private ByteBuffer build() { + byte[] data = new byte[1]; + data[0] = 1; + + return ByteBuffer.wrap(data); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java new file mode 100644 index 00000000000..9b9ca8e6fc0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.cdc; + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.failure.StopNodeFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; +import static org.junit.Assume.assumeFalse; + +/** */ +@RunWith(Parameterized.class) +public class RealtimeCdcBufferTest extends GridCommonAbstractTest { + /** */ + private static final String CONSISTENT_ID = "ID"; + + /** */ + private static ListeningTestLogger lsnrLog; + + /** */ + private static CountDownLatch stopLatch; + + /** */ + private ByteBufferCdcConsumer consumer; + + /** */ + private boolean cdcEnabled; + + /** */ + private int maxCdcBufSize; + + /** */ + @Parameterized.Parameter() + public WALMode walMode; + + /** */ + @Parameterized.Parameters(name = "walMode={0}") + public static List<Object[]> params() { + return F.asList( + new Object[] { WALMode.LOG_ONLY }, + new Object[] { WALMode.BACKGROUND }, + new Object[] { WALMode.FSYNC } + ); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(instanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setMaxCdcBufferSize(maxCdcBufSize) + .setCdcConsumer(consumer) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setCdcEnabled(cdcEnabled) + .setPersistenceEnabled(true) + ) + .setWalMode(walMode) + ); + + cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + cfg.setFailureHandler(new StopNodeFailureHandler()); + + cfg.setGridLogger(lsnrLog); + + cfg.setConsistentId(CONSISTENT_ID); + + return cfg; + } + + /** */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + lsnrLog = new ListeningTestLogger(log); + + cleanPersistenceDir(); + + cdcEnabled = true; + + consumer = new ByteBufferCdcConsumer(10 * (int)U.MB); + + stopLatch = null; + } + + /** */ + @Test + public void testCdcBufferOverflow() throws Exception { + maxCdcBufSize = (int)U.KB; + + checkCdcBufferOverflow(10 * (int)U.KB, 100, true); + } + + /** */ + @Test + public void testCdcDisabled() throws Exception { + cdcEnabled = false; + + checkCdcBufferOverflow(10 * (int)U.KB, 100, false); + } + + /** */ + @Test + public void testCdcBufferContent() throws Exception { + // TODO: Looks like there is a bug in the FSYNC case: WAL misses some records. + assumeFalse(walMode == WALMode.FSYNC); + + maxCdcBufSize = 10 * (int)U.MB; + + stopLatch = new CountDownLatch(1); + + checkCdcBufferOverflow((int)U.KB, 100, false); + + U.awaitQuiet(stopLatch); + + File walSegments = U.resolveWorkDirectory( + U.defaultWorkDirectory(), + DFLT_STORE_DIR + "/wal/" + CONSISTENT_ID, false); + + WALIterator it = walIter(walSegments); + + while (it.hasNext()) + it.next(); + + WALPointer ptr = it.lastRead().get(); + int length = ptr.fileOffset() + ptr.length(); + + File seg = Arrays.stream(walSegments.listFiles()).sorted().findFirst().get(); + + byte[] walSegData = Files.readAllBytes(seg.toPath()); + + int step = 100; + + for (int off = 0; off < length; off += step) { + int l = off + step < length ? step : length - off; + + byte[] testWalData = new byte[l]; + byte[] testCdcData = new byte[l]; + + ByteBuffer buf = ByteBuffer.wrap(walSegData); + buf.position(off); + buf.get(testWalData, 0, l); + + buf = ByteBuffer.wrap(consumer.buf.array()); + buf.position(off); + buf.get(testCdcData, 0, l); + + assertTrue( + "Offset " + off + "/" + length + "\n" + + "EXPECT " + Arrays.toString(testWalData) + "\n" + + "ACTUAL " + Arrays.toString(testCdcData), + Arrays.equals(testWalData, testCdcData)); + } + } + + /** */ + private void checkCdcBufferOverflow(int entrySize, int entryCnt, boolean shouldOverflow) throws Exception { + LogListener lsnr = LogListener.matches("CDC buffer has overflowed. Stop realtime mode of CDC.") + .times(shouldOverflow ? 1 : 0) + .build(); + + lsnrLog.registerListener(lsnr); + + IgniteEx crd = startGrid(0); + + crd.cluster().state(ClusterState.ACTIVE); + + IgniteCache<Integer, byte[]> cache = crd.cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < entryCnt; i++) { + byte[] data = new byte[entrySize]; + + Arrays.fill(data, (byte)1); + + cache.put(i, data); + } + + forceCheckpoint(crd); + + stopGrid(0); + + assertTrue(lsnr.check()); + } + + /** Get iterator over WAL. */ + private WALIterator walIter(File walSegments) throws Exception { + IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log); + + IgniteWalIteratorFactory.IteratorParametersBuilder params = new IgniteWalIteratorFactory.IteratorParametersBuilder() + .filesOrDirs(walSegments); + + return factory.iterator(params); + } + + /** */ + private static class ByteBufferCdcConsumer implements CdcBufferConsumer { + /** */ + private final ByteBuffer buf; + + /** */ + ByteBufferCdcConsumer(int maxCdcBufSize) { + buf = ByteBuffer.allocate(maxCdcBufSize); + + Arrays.fill(buf.array(), (byte)0); + + buf.position(0); + } + + /** */ + @Override public void consume(ByteBuffer data) { + buf.put(data); + } + + /** */ + @Override public void close() { + if (stopLatch != null) + stopLatch.countDown(); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferWalMmapDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferWalMmapDisabledTest.java new file mode 100644 index 00000000000..01247fe964d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferWalMmapDisabledTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.cdc; + +import org.apache.ignite.testframework.junits.WithSystemProperty; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP; + +/** */ +@WithSystemProperty(key = IGNITE_WAL_MMAP, value = "false") +public class RealtimeCdcBufferWalMmapDisabledTest extends RealtimeCdcBufferTest { +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 8b39f3e54ed..746a336da27 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -39,6 +39,10 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentS import org.apache.ignite.internal.processors.cache.persistence.IgniteRebalanceScheduleResendPartitionsTest; import org.apache.ignite.internal.processors.cache.persistence.WALPreloadingWithCompactionTest; import org.apache.ignite.internal.processors.cache.persistence.WalPreloadingConcurrentTest; +import org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferDedicatedWorkerTest; +import org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferSelfTest; +import org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferTest; +import org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferWalMmapDisabledTest; import org.apache.ignite.internal.processors.cache.persistence.db.FullHistRebalanceOnClientStopTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest; @@ -159,6 +163,12 @@ public class IgnitePdsTestSuite2 { GridTestUtils.addTestIfNeeded(suite, CdcCacheConfigOnRestartTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CdcNonDefaultWorkDirTest.class, ignoredTests); + // Realtime CDC tests. + GridTestUtils.addTestIfNeeded(suite, RealtimeCdcBufferSelfTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, RealtimeCdcBufferTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, RealtimeCdcBufferDedicatedWorkerTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, RealtimeCdcBufferWalMmapDisabledTest.class, ignoredTests); + // new style folders with generated consistent ID test GridTestUtils.addTestIfNeeded(suite, IgniteUidAsConsistentIdMigrationTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteWalSerializerVersionTest.class, ignoredTests);