This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch IGNITE-17700__realtime_cdc
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/IGNITE-17700__realtime_cdc by 
this push:
     new 1c2cf8dd0a4 IGNITE-19622 Add realtime CDC buffer (#10778)
1c2cf8dd0a4 is described below

commit 1c2cf8dd0a426e88d9b70cc7d15db71f9edd694c
Author: Maksim Timonin <timoninma...@apache.org>
AuthorDate: Wed Jul 12 13:47:01 2023 +0300

    IGNITE-19622 Add realtime CDC buffer (#10778)
---
 .../org/apache/ignite/IgniteSystemProperties.java  |   8 +
 .../configuration/DataStorageConfiguration.java    |  55 +++++
 .../cache/persistence/cdc/CdcBuffer.java           | 164 +++++++++++++
 .../cache/persistence/cdc/CdcBufferConsumer.java   |  33 +++
 .../cache/persistence/cdc/CdcProcessor.java        |  81 +++++++
 .../cache/persistence/cdc/CdcWorker.java           |  93 ++++++++
 .../persistence/wal/FileWriteAheadLogManager.java  |  19 +-
 .../wal/filehandle/FileHandleManagerFactory.java   |   6 +
 .../wal/filehandle/FileHandleManagerImpl.java      |  21 +-
 .../wal/filehandle/FileWriteHandleImpl.java        |  17 +-
 .../wal/filehandle/FsyncFileHandleManagerImpl.java |  11 +-
 .../wal/filehandle/FsyncFileWriteHandle.java       |  16 +-
 .../cdc/RealtimeCdcBufferDedicatedWorkerTest.java  |  42 ++++
 .../persistence/cdc/RealtimeCdcBufferSelfTest.java | 113 +++++++++
 .../persistence/cdc/RealtimeCdcBufferTest.java     | 259 +++++++++++++++++++++
 .../cdc/RealtimeCdcBufferWalMmapDisabledTest.java  |  27 +++
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |  10 +
 17 files changed, 966 insertions(+), 9 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 5fb101aebce..eaabed6607a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -93,6 +93,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topolo
 import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.DFLT_MVCC_TX_SIZE_CACHING_THRESHOLD;
 import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE;
 import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_PDS_WAL_REBALANCE_THRESHOLD;
+import static 
org.apache.ignite.internal.processors.cache.persistence.cdc.CdcWorker.DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT;
 import static 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory.DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
 import static 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage.DFLT_IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD;
 import static 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointWorkflow.DFLT_CHECKPOINT_PARALLEL_SORT_THRESHOLD;
@@ -1531,6 +1532,13 @@ public final class IgniteSystemProperties {
         type = Long.class, defaults = "" + DFLT_UNWIND_THROTTLING_TIMEOUT)
     public static final String IGNITE_UNWIND_THROTTLING_TIMEOUT = 
"IGNITE_UNWIND_THROTTLING_TIMEOUT";
 
+    /**
+     * Throttling timeout in millis for polling CDC buffer in realtime mode. 
Default is 100 ms.
+     */
+    @SystemProperty(value = "Throttling timeout in millis for polling CDC 
buffer in realtime mode", type = Long.class,
+        defaults = "" + DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT)
+    public static final String IGNITE_THROTTLE_POLL_CDC_BUF = 
"IGNITE_THROTTLE_POLL_CDC_BUF";
+
     /**
      * Threshold for throttling operations logging.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index 8daa97a1c7b..86ed3c4e3ea 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.ignite.configuration;
 import java.io.Serializable;
 import java.util.zip.Deflater;
 import org.apache.ignite.IgniteSystemProperties;
+import 
org.apache.ignite.internal.processors.cache.persistence.cdc.CdcBufferConsumer;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
@@ -252,6 +253,14 @@ public class DataStorageConfiguration implements 
Serializable {
     @IgniteExperimental
     private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;
 
+    /** Maximum size of CDC buffer. */
+    @IgniteExperimental
+    private long maxCdcBufSize = (long)DFLT_WAL_SEGMENTS * 
DFLT_WAL_SEGMENT_SIZE;
+
+    /** CDC buffer consumer. */
+    @IgniteExperimental
+    private transient CdcBufferConsumer cdcConsumer;
+
     /**
      * Metrics enabled flag.
      * @deprecated Will be removed in upcoming releases.
@@ -1355,6 +1364,52 @@ public class DataStorageConfiguration implements 
Serializable {
         return defragmentationThreadPoolSize;
     }
 
+    /**
+     * Gets a max size(in bytes) of CDC buffer.
+     *
+     * @return max size(in bytes) of CDC buffer.
+     */
+    public long getMaxCdcBufferSize() {
+        return maxCdcBufSize;
+    }
+
+    /**
+     * Sets a max allowed size(in bytes) of CDC buffer. Set to {@code 0} to 
disable realtime CDC mode.
+     *
+     * @param maxCdcBufSize max size(in bytes) of CDC buffer.
+     * @return {@code this} for chaining.
+     */
+    public DataStorageConfiguration setMaxCdcBufferSize(long maxCdcBufSize) {
+        A.ensure(
+            maxCdcBufSize >= 0,
+            "maxCdcBufferSize must be greater than 0. To disable realtime mode 
of CDC set value to 0.");
+
+        this.maxCdcBufSize = maxCdcBufSize;
+
+        return this;
+    }
+
+    /**
+     * Gets CDC raw data consumer.
+     *
+     * @return CDC raw data consumer.
+     */
+    public CdcBufferConsumer getCdcConsumer() {
+        return cdcConsumer;
+    }
+
+    /**
+     * Sets CDC raw data consumer.
+     *
+     * @param cdcConsumer CDC raw data consumer.
+     * @return {@code this} for chaining.
+     */
+    public DataStorageConfiguration setCdcConsumer(CdcBufferConsumer 
cdcConsumer) {
+        this.cdcConsumer = cdcConsumer;
+
+        return this;
+    }
+
     /**
      * Gets a min allowed size(in bytes) of WAL archives.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
new file mode 100644
index 00000000000..1b105bac5db
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Buffer that stores WAL records before {@link CdcWorker} consumes it. Buffer 
is a single-producer single-consumer bounded queue.
+ * <p>
+ * TODO: Optimize the queue:
+ *       1. by space using LinkedArrayQueue. Example: 
http://psy-lob-saw.blogspot.com/2016/10/linked-array-queues-part-1-spsc.html.
+ *          It helps to avoid using AtomicLong for #size.
+ *       2. by performance using AtomicReference#lazySet or similar for 
LinkedNode#next.
+ */
+public class CdcBuffer {
+    /** Maximum size of the underlying buffer, bytes. */
+    private final long maxSize;
+
+    /** Reference to last consumed linked node. */
+    private LinkedNode consumerNode;
+
+    /** Reference to last produced linked node. */
+    private volatile LinkedNode producerNode;
+
+    /** Current size of the buffer, bytes. */
+    private final AtomicLong size = new AtomicLong();
+
+    /** If {@code true} then buffer has overflowed. */
+    private volatile boolean overflowed;
+
+    /** */
+    public CdcBuffer(long maxSize) {
+        assert maxSize > 0 : maxSize;
+
+        this.maxSize = maxSize;
+
+        producerNode = consumerNode = new LinkedNode(null);
+    }
+
+    /**
+     * Offers data for the queue. Performs by the single producer thread.
+     *
+     * @param data Data to store in the buffer.
+     */
+    public boolean offer(ByteBuffer data) {
+        int bufSize = data.limit() - data.position();
+
+        if (size.addAndGet(bufSize) > maxSize) {
+            overflowed = true;
+
+            return false;
+        }
+
+        byte[] cp = new byte[bufSize];
+
+        data.get(cp, 0, bufSize);
+
+        LinkedNode newNode = new LinkedNode(ByteBuffer.wrap(cp));
+        LinkedNode oldNode = producerNode;
+
+        producerNode = newNode;
+        oldNode.next(newNode);
+
+        return true;
+    }
+
+    /**
+     * Polls data from the queue. Performs by single consumer thread.
+     *
+     * @return Polled data, or {@code null} if no data is available now.
+     */
+    public ByteBuffer poll() {
+        LinkedNode prev = consumerNode;
+
+        LinkedNode next = prev.next;
+
+        if (next != null)
+            return poll(prev, next);
+        else if (prev != producerNode) {
+            while ((next = prev.next) == null) {
+                // No-op. New reference should appear soon.
+            }
+
+            return poll(prev, next);
+        }
+
+        return null;
+    }
+
+    /** @return Current buffer size. */
+    public long size() {
+        return size.get();
+    }
+
+    /** Cleans the buffer if overflowed. Performs by the consumer thread. */
+    public void cleanIfOverflowed() {
+        if (!overflowed || consumerNode == null)
+            return;
+
+        ByteBuffer data;
+
+        do {
+            data = poll();
+        }
+        while (data != null);
+
+        consumerNode = null;
+        producerNode = null;
+
+        size.set(0);
+    }
+
+    /**
+     * @param prev Previously consumed linked node.
+     * @param next Node to consume.
+     * @return Data to consume.
+     */
+    private ByteBuffer poll(LinkedNode prev, LinkedNode next) {
+        ByteBuffer data = next.data;
+
+        prev.next = null;
+        consumerNode = next;
+
+        size.addAndGet(-(data.limit() - data.position()));
+
+        return data;
+    }
+
+    /** */
+    private static class LinkedNode {
+        /** */
+        private volatile @Nullable LinkedNode next;
+
+        /** */
+        private final ByteBuffer data;
+
+        /** */
+        LinkedNode(ByteBuffer data) {
+            this.data = data;
+        }
+
+        /** */
+        void next(LinkedNode next) {
+            this.next = next;
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
new file mode 100644
index 00000000000..67cd422fb0f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.nio.ByteBuffer;
+
+/** Mock for Realtime CDC buffer consumer. */
+public interface CdcBufferConsumer {
+    /**
+     * Consumes raw WAL data.
+     *
+     * @param data Raw data to consume.
+     */
+    public void consume(ByteBuffer data);
+
+    /** */
+    public void close();
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
new file mode 100644
index 00000000000..92adcf2acaf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/** CDC processor responsible for collecting data changes in realtime within 
Ignite node. */
+public class CdcProcessor {
+    /** Buffer to store collected data. */
+    private final CdcBuffer cdcBuf;
+
+    /** CDC worker. */
+    private final CdcWorker worker;
+
+    /** Ignite log. */
+    private final IgniteLogger log;
+
+    /** Whether CDC is enabled. Disables after {@link #cdcBuf} overflows. */
+    private boolean enabled = true;
+
+    /** */
+    public CdcProcessor(GridCacheSharedContext<?, ?> cctx, IgniteLogger log) {
+        this.log = log;
+
+        DataStorageConfiguration dsCfg = 
cctx.gridConfig().getDataStorageConfiguration();
+
+        cdcBuf = new CdcBuffer(dsCfg.getMaxCdcBufferSize());
+        worker = new CdcWorker(cctx, log, cdcBuf, dsCfg.getCdcConsumer());
+    }
+
+    /**
+     * @param dataBuf Buffer that contains data to collect.
+     */
+    public void collect(ByteBuffer dataBuf) {
+        if (!enabled)
+            return;
+
+        if (log.isDebugEnabled())
+            log.debug("Offerring a data bucket to the CDC buffer [len=" + 
(dataBuf.limit() - dataBuf.position()) + ']');
+
+        if (!cdcBuf.offer(dataBuf)) {
+            enabled = false;
+
+            log.warning("CDC buffer has overflowed. Stop realtime mode of 
CDC.");
+
+            worker.cancel();
+        }
+    }
+
+    /** Start CDC worker. */
+    public void start() {
+        worker.restart();
+    }
+
+    /** Shutdown CDC worker. */
+    public void shutdown() throws IgniteInterruptedCheckedException {
+        worker.cancel();
+
+        U.join(worker);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
new file mode 100644
index 00000000000..27655413c29
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+
+/** */
+public class CdcWorker extends GridWorker {
+    /** Default throttling timeout in millis for polling CDC buffer. */
+    public static final int DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT = 100;
+
+    /** Throttling timeout in millis for polling CDC buffer. */
+    private final long cdcBufPollTimeout = Long.getLong(
+        IgniteSystemProperties.IGNITE_THROTTLE_POLL_CDC_BUF, 
DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT);
+
+    /** CDC buffer. */
+    private final CdcBuffer cdcBuf;
+
+    /** CDC consumer. */
+    private final CdcBufferConsumer consumer;
+
+    /** */
+    public CdcWorker(
+        GridCacheSharedContext<?, ?> cctx,
+        IgniteLogger log,
+        CdcBuffer cdcBuf,
+        CdcBufferConsumer consumer
+    ) {
+        super(cctx.igniteInstanceName(),
+            "cdc-worker%" + cctx.igniteInstanceName(),
+            log,
+            cctx.kernalContext().workersRegistry());
+
+        this.cdcBuf = cdcBuf;
+        this.consumer = consumer;
+    }
+
+    /** */
+    @Override public void body() {
+        while (!isCancelled()) {
+            updateHeartbeat();
+
+            ByteBuffer data = cdcBuf.poll();
+
+            if (data == null) {
+                LockSupport.parkNanos(cdcBufPollTimeout * 1_000_000);  // 
millis to nanos.
+
+                continue;
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Poll a data bucket from CDC buffer [len=" + 
(data.limit() - data.position()) + ']');
+
+            // TODO: Consumer must not block this system thread.
+            consumer.consume(data);
+        }
+
+        consumer.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void cleanup() {
+        cdcBuf.cleanIfOverflowed();
+    }
+
+    /** */
+    public void restart() {
+        isCancelled.set(false);
+
+        new IgniteThread(this).start();
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index f68b366fbb6..5470e2379cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -87,6 +87,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import 
org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
@@ -313,6 +314,9 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, 
FileWriteHandle> CURR_HND_UPD =
         AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, 
FileWriteHandle.class, "currHnd");
 
+    /** CDC processor, {@code null} if CDC is disabled. */
+    @Nullable private CdcProcessor cdcProc;
+
     /**
      * File archiver moves segments from work directory to archive. Locked 
segments may be kept not moved until release.
      * For mode archive and work folders set to equal value, archiver is not 
created.
@@ -521,6 +525,9 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                         dispatcher.registerProperty(cdcDisabled);
                     });
+
+                if (dsCfg.getMaxCdcBufferSize() > 0)
+                    cdcProc = new CdcProcessor(cctx, log);
             }
 
             serializer = new 
RecordSerializerFactoryImpl(cctx).createSerializer(serializerVer);
@@ -581,7 +588,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, 
segmentAware, dsCfg);
 
             fileHandleManager = fileHandleManagerFactory.build(
-                cctx, metrics, mmap, serializer, this::currentHandle
+                cctx, metrics, mmap, serializer, cdcProc, this::currentHandle
             );
 
             lockedSegmentFileInputFactory = new LockedSegmentFileInputFactory(
@@ -753,6 +760,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             if (cleaner != null)
                 cleaner.shutdown();
+
+            // TODO: stop cdcProcessor after checkpoint (reason=node stop) 
finished.
+            if (cdcProc != null)
+                cdcProc.shutdown();
         }
         catch (IgniteInterruptedCheckedException e) {
             U.error(log, "Failed to gracefully shutdown WAL components, thread 
was interrupted.", e);
@@ -764,7 +775,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         if (log.isDebugEnabled())
             log.debug("Activated file write ahead log manager [nodeId=" + 
cctx.localNodeId() +
                 " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
-        //NOOP implementation, we need to override it.
+
+        // TODO: Invoke here to avoid double initialization. CdcConsumer must 
consume data only after full start.
+        //       Will fix in https://issues.apache.org/jira/browse/IGNITE-19637
+        if (cdcProc != null)
+            cdcProc.start();
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
index 8850c2003b6..619ff63f2d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
@@ -23,7 +23,9 @@ import 
org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Factory of {@link FileHandleManager}.
@@ -48,6 +50,7 @@ public class FileHandleManagerFactory {
      * @param metrics Data storage metrics.
      * @param mmap Using mmap.
      * @param serializer Serializer.
+     * @param cdcProc CDC processor.
      * @param currHandleSupplier Supplier of current handle.
      * @return One of implementation of {@link FileHandleManager}.
      */
@@ -56,6 +59,7 @@ public class FileHandleManagerFactory {
         DataStorageMetricsImpl metrics,
         boolean mmap,
         RecordSerializer serializer,
+        @Nullable CdcProcessor cdcProc,
         Supplier<FileWriteHandle> currHandleSupplier
     ) {
         if (dsConf.getWalMode() == WALMode.FSYNC && 
!walFsyncWithDedicatedWorker)
@@ -63,6 +67,7 @@ public class FileHandleManagerFactory {
                 cctx,
                 metrics,
                 serializer,
+                cdcProc,
                 currHandleSupplier,
                 dsConf.getWalMode(),
                 dsConf.getWalSegmentSize(),
@@ -75,6 +80,7 @@ public class FileHandleManagerFactory {
                 metrics,
                 mmap,
                 serializer,
+                cdcProc,
                 currHandleSupplier,
                 dsConf.getWalMode(),
                 dsConf.getWalBufferSize(),
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
index 6b08531a233..e0a779fa61a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import 
org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
@@ -41,6 +42,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
 
 import static java.lang.Long.MAX_VALUE;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT;
@@ -63,6 +65,9 @@ public class FileHandleManagerImpl implements 
FileHandleManager {
     /** Wal segment sync worker. */
     private final WalSegmentSyncer walSegmentSyncWorker;
 
+    /** CDC processor, {@code null} if CDC is disabled. */
+    private final @Nullable CdcProcessor cdcProc;
+
     /** Context. */
     protected final GridCacheSharedContext cctx;
 
@@ -109,6 +114,7 @@ public class FileHandleManagerImpl implements 
FileHandleManager {
         DataStorageMetricsImpl metrics,
         boolean mmap,
         RecordSerializer serializer,
+        @Nullable CdcProcessor cdcProc,
         Supplier<FileWriteHandle> currentHandleSupplier,
         WALMode mode,
         int walBufferSize,
@@ -126,6 +132,7 @@ public class FileHandleManagerImpl implements 
FileHandleManager {
         this.maxWalSegmentSize = maxWalSegmentSize;
         this.fsyncDelay = fsyncDelay;
         walWriter = new WALWriter(log);
+        this.cdcProc = cdcProc;
 
         if (mode != WALMode.NONE && mode != WALMode.FSYNC) {
             walSegmentSyncWorker = new WalSegmentSyncer(
@@ -163,7 +170,7 @@ public class FileHandleManagerImpl implements 
FileHandleManager {
         rbuf.init(position);
 
         return new FileWriteHandleImpl(
-            cctx, fileIO, rbuf, serializer, metrics, walWriter, position,
+            cctx, fileIO, rbuf, serializer, metrics, walWriter, cdcProc, 
position,
             mode, mmap, true, fsyncDelay, maxWalSegmentSize
         );
     }
@@ -181,7 +188,7 @@ public class FileHandleManagerImpl implements 
FileHandleManager {
             rbuf = currentHandle().buf.reset();
 
         return new FileWriteHandleImpl(
-            cctx, fileIO, rbuf, serializer, metrics, walWriter, 0,
+            cctx, fileIO, rbuf, serializer, metrics, walWriter, cdcProc, 0,
             mode, mmap, false, fsyncDelay, maxWalSegmentSize
         );
     }
@@ -376,7 +383,17 @@ public class FileHandleManagerImpl implements 
FileHandleManager {
                         updateHeartbeat();
 
                         try {
+                            int bufPos = seg.buffer().position();
+
                             writeBuffer(seg.position(), seg.buffer());
+
+                            if (cdcProc != null) {
+                                ByteBuffer cdcBuf = seg.buffer().duplicate();
+                                cdcBuf.position(bufPos);
+                                cdcBuf.limit(seg.buffer().limit());
+
+                                cdcProc.collect(cdcBuf);
+                            }
                         }
                         catch (Throwable e) {
                             log.error("Exception in WAL writer thread:", e);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
index 5dc2ec155f6..a1fe746eeb6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import 
org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
@@ -114,6 +115,9 @@ class FileWriteHandleImpl extends AbstractFileHandle 
implements FileWriteHandle
     /** Buffer. */
     protected final SegmentedRingByteBuffer buf;
 
+    /** Cdc Buffer, {@code null} if CDC is disabled. */
+    private final @Nullable CdcProcessor cdcProc;
+
     /** */
     private final WALMode mode;
 
@@ -157,8 +161,8 @@ class FileWriteHandleImpl extends AbstractFileHandle 
implements FileWriteHandle
      */
     FileWriteHandleImpl(
         GridCacheSharedContext cctx, SegmentIO fileIO, SegmentedRingByteBuffer 
rbuf, RecordSerializer serializer,
-        DataStorageMetricsImpl metrics, FileHandleManagerImpl.WALWriter 
writer, long pos, WALMode mode, boolean mmap,
-        boolean resume, long fsyncDelay, long maxWalSegmentSize) throws 
IOException {
+        DataStorageMetricsImpl metrics, FileHandleManagerImpl.WALWriter 
writer, CdcProcessor cdcProc,
+        long pos, WALMode mode, boolean mmap, boolean resume, long fsyncDelay, 
long maxWalSegmentSize) throws IOException {
         super(fileIO);
         assert serializer != null;
 
@@ -170,6 +174,7 @@ class FileWriteHandleImpl extends AbstractFileHandle 
implements FileWriteHandle
         this.log = cctx.logger(FileWriteHandleImpl.class);
         this.cctx = cctx;
         this.walWriter = writer;
+        this.cdcProc = cdcProc;
         this.serializer = serializer;
         this.written = pos;
         this.lastFsyncPos = pos;
@@ -413,6 +418,14 @@ class FileWriteHandleImpl extends AbstractFileHandle 
implements FileWriteHandle
 
                         fsync((MappedByteBuffer)buf.buf, off, len);
 
+                        if (cdcProc != null) {
+                            ByteBuffer cdcBuf = buf.buf.duplicate();
+                            cdcBuf.position(off);
+                            cdcBuf.limit(off + len);
+
+                            cdcProc.collect(cdcBuf);
+                        }
+
                         seg.release();
                     }
                 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
index af3957c128b..8b398b3fc54 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
@@ -26,9 +26,11 @@ import 
org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import 
org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Implementation of {@link FileWriteHandle} for FSYNC mode.
@@ -49,6 +51,9 @@ public class FsyncFileHandleManagerImpl implements 
FileHandleManager {
     /** */
     protected final RecordSerializer serializer;
 
+    /** */
+    private final @Nullable CdcProcessor cdcProc;
+
     /** Current handle supplier. */
     private final Supplier<FileWriteHandle> currentHandleSupplier;
 
@@ -75,6 +80,7 @@ public class FsyncFileHandleManagerImpl implements 
FileHandleManager {
         GridCacheSharedContext cctx,
         DataStorageMetricsImpl metrics,
         RecordSerializer serializer,
+        @Nullable CdcProcessor cdcProc,
         Supplier<FileWriteHandle> handle,
         WALMode mode,
         long maxWalSegmentSize,
@@ -86,6 +92,7 @@ public class FsyncFileHandleManagerImpl implements 
FileHandleManager {
         this.mode = mode;
         this.metrics = metrics;
         this.serializer = serializer;
+        this.cdcProc = cdcProc;
         currentHandleSupplier = handle;
         this.maxWalSegmentSize = maxWalSegmentSize;
         this.fsyncDelay = fsyncDelay;
@@ -96,7 +103,7 @@ public class FsyncFileHandleManagerImpl implements 
FileHandleManager {
     @Override public FileWriteHandle initHandle(SegmentIO fileIO, long 
position,
         RecordSerializer serializer) throws IOException {
         return new FsyncFileWriteHandle(
-            cctx, fileIO, metrics, serializer, position,
+            cctx, fileIO, metrics, serializer, cdcProc, position,
             mode, maxWalSegmentSize, tlbSize, fsyncDelay
         );
     }
@@ -105,7 +112,7 @@ public class FsyncFileHandleManagerImpl implements 
FileHandleManager {
     @Override public FileWriteHandle nextHandle(SegmentIO fileIO,
         RecordSerializer serializer) throws IOException {
         return new FsyncFileWriteHandle(
-            cctx, fileIO, metrics, serializer, 0,
+            cctx, fileIO, metrics, serializer, cdcProc, 0,
             mode, maxWalSegmentSize, tlbSize, fsyncDelay
         );
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
index ca06af53b8b..59178e497ba 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import 
org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
@@ -114,6 +115,9 @@ class FsyncFileWriteHandle extends AbstractFileHandle 
implements FileWriteHandle
     /** Persistence metrics tracker. */
     private final DataStorageMetricsImpl metrics;
 
+    /** */
+    private final @Nullable CdcProcessor cdcProc;
+
     /** Logger. */
     protected final IgniteLogger log;
 
@@ -152,7 +156,7 @@ class FsyncFileWriteHandle extends AbstractFileHandle 
implements FileWriteHandle
      */
     FsyncFileWriteHandle(
         GridCacheSharedContext cctx, SegmentIO fileIO,
-        DataStorageMetricsImpl metrics, RecordSerializer serializer, long pos,
+        DataStorageMetricsImpl metrics, RecordSerializer serializer, 
CdcProcessor cdcProc, long pos,
         WALMode mode, long maxSegmentSize, int size, long fsyncDelay) throws 
IOException {
         super(fileIO);
         assert serializer != null;
@@ -165,6 +169,7 @@ class FsyncFileWriteHandle extends AbstractFileHandle 
implements FileWriteHandle
         this.fsyncDelay = fsyncDelay;
         this.maxSegmentSize = maxSegmentSize;
         this.serializer = serializer;
+        this.cdcProc = cdcProc;
         this.written = pos;
         this.lastFsyncPos = pos;
 
@@ -780,6 +785,7 @@ class FsyncFileWriteHandle extends AbstractFileHandle 
implements FileWriteHandle
 
             // Do the write.
             int size = buf.remaining();
+            int bufPos = buf.position();
 
             assert size > 0 : size;
 
@@ -799,6 +805,14 @@ class FsyncFileWriteHandle extends AbstractFileHandle 
implements FileWriteHandle
 
                 throw se;
             }
+
+            if (cdcProc != null) {
+                ByteBuffer cdcBuf = buf.duplicate();
+                cdcBuf.position(bufPos);
+                cdcBuf.limit(buf.limit());
+
+                cdcProc.collect(cdcBuf);
+            }
         }
         finally {
             writeComplete.signalAll();
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferDedicatedWorkerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferDedicatedWorkerTest.java
new file mode 100644
index 00000000000..3f48c8f6145
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferDedicatedWorkerTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
+
+/** */
+@WithSystemProperty(key = IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER, value = 
"true")
+@WithSystemProperty(key = IGNITE_WAL_MMAP, value = "false")
+public class RealtimeCdcBufferDedicatedWorkerTest extends 
RealtimeCdcBufferTest {
+    /** Override params to test only FSYNC mode. */
+    @Parameterized.Parameters(name = "walMode={0}")
+    public static List<Object[]> params() {
+        List<Object[]> param = new ArrayList<>();
+
+        param.add(new Object[] {WALMode.FSYNC});
+
+        return param;
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferSelfTest.java
new file mode 100644
index 00000000000..a16a256da4b
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferSelfTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.LockSupport;
+import org.junit.Test;
+
+/** */
+public class RealtimeCdcBufferSelfTest {
+    /** */
+    @Test
+    public void testDisableBuffer() {
+        CdcBuffer buf = new CdcBuffer(10);
+
+        // Fill the buffer.
+        for (int i = 0; i < 10; i++) {
+            boolean res = buf.offer(build());
+
+            assert res;
+            assert buf.size() == i + 1;
+        }
+
+        boolean res = buf.offer(build());
+
+        assert !res;
+    }
+
+    /** */
+    @Test
+    public void testConsumeEmptyBuffer() {
+        CdcBuffer buf = new CdcBuffer(10);
+
+        for (int i = 0; i < 10; i++) {
+            ByteBuffer data = buf.poll();
+
+            assert data == null;
+            assert buf.size() == 0;
+        }
+    }
+
+    /** */
+    @Test
+    public void testConsumeFullBuffer() {
+        CdcBuffer buf = new CdcBuffer(10);
+
+        for (int i = 0; i < 10; i++)
+            buf.offer(build());
+
+        for (int i = 0; i < 10; i++) {
+            ByteBuffer data = buf.poll();
+
+            assert build().equals(data);
+            assert buf.size() == 10 - (i + 1);
+        }
+
+        boolean res = buf.offer(build());
+
+        assert res;
+        assert buf.size() == 1;
+    }
+
+    /** */
+    @Test
+    public void testConcurrentFillBuffer() throws Exception {
+        int size = 1_000_000;
+
+        CdcBuffer buf = new CdcBuffer(size);
+
+        Thread th1 = new Thread(() -> {
+            for (int i = 0; i < size; i++) {
+                buf.offer(build());
+
+                LockSupport.parkNanos(1_000);
+            }
+        });
+
+        Thread th2 = new Thread(() -> {
+            // Goal is to invoke `poll` more frequently than `offer`, to check 
`poll` waits offered value.
+            for (int i = 0; i < size * 1_000; i++)
+                buf.poll();
+        });
+
+        th1.start();
+        th2.start();
+
+        th1.join();
+        th2.join();
+    }
+
+    /** */
+    private ByteBuffer build() {
+        byte[] data = new byte[1];
+        data[0] = 1;
+
+        return ByteBuffer.wrap(data);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
new file mode 100644
index 00000000000..9b9ca8e6fc0
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+import static org.junit.Assume.assumeFalse;
+
+/** */
+@RunWith(Parameterized.class)
+public class RealtimeCdcBufferTest extends GridCommonAbstractTest {
+    /** */
+    private static final String CONSISTENT_ID = "ID";
+
+    /** */
+    private static ListeningTestLogger lsnrLog;
+
+    /** */
+    private static CountDownLatch stopLatch;
+
+    /** */
+    private ByteBufferCdcConsumer consumer;
+
+    /** */
+    private boolean cdcEnabled;
+
+    /** */
+    private int maxCdcBufSize;
+
+    /** */
+    @Parameterized.Parameter()
+    public WALMode walMode;
+
+    /** */
+    @Parameterized.Parameters(name = "walMode={0}")
+    public static List<Object[]> params() {
+        return F.asList(
+            new Object[] { WALMode.LOG_ONLY },
+            new Object[] { WALMode.BACKGROUND },
+            new Object[] { WALMode.FSYNC }
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
instanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setMaxCdcBufferSize(maxCdcBufSize)
+            .setCdcConsumer(consumer)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setCdcEnabled(cdcEnabled)
+                .setPersistenceEnabled(true)
+            )
+            .setWalMode(walMode)
+        );
+
+        cfg.setCacheConfiguration(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+
+        cfg.setGridLogger(lsnrLog);
+
+        cfg.setConsistentId(CONSISTENT_ID);
+
+        return cfg;
+    }
+
+    /** */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        lsnrLog = new ListeningTestLogger(log);
+
+        cleanPersistenceDir();
+
+        cdcEnabled = true;
+
+        consumer = new ByteBufferCdcConsumer(10 * (int)U.MB);
+
+        stopLatch = null;
+    }
+
+    /** */
+    @Test
+    public void testCdcBufferOverflow() throws Exception {
+        maxCdcBufSize = (int)U.KB;
+
+        checkCdcBufferOverflow(10 * (int)U.KB, 100, true);
+    }
+
+    /** */
+    @Test
+    public void testCdcDisabled() throws Exception {
+        cdcEnabled = false;
+
+        checkCdcBufferOverflow(10 * (int)U.KB, 100, false);
+    }
+
+    /** */
+    @Test
+    public void testCdcBufferContent() throws Exception {
+        // TODO: Looks like there is a bug in the FSYNC case: WAL misses some 
records.
+        assumeFalse(walMode == WALMode.FSYNC);
+
+        maxCdcBufSize = 10 * (int)U.MB;
+
+        stopLatch = new CountDownLatch(1);
+
+        checkCdcBufferOverflow((int)U.KB, 100, false);
+
+        U.awaitQuiet(stopLatch);
+
+        File walSegments = U.resolveWorkDirectory(
+            U.defaultWorkDirectory(),
+            DFLT_STORE_DIR + "/wal/" + CONSISTENT_ID, false);
+
+        WALIterator it = walIter(walSegments);
+
+        while (it.hasNext())
+            it.next();
+
+        WALPointer ptr = it.lastRead().get();
+        int length = ptr.fileOffset() + ptr.length();
+
+        File seg = 
Arrays.stream(walSegments.listFiles()).sorted().findFirst().get();
+
+        byte[] walSegData = Files.readAllBytes(seg.toPath());
+
+        int step = 100;
+
+        for (int off = 0; off < length; off += step) {
+            int l = off + step < length ? step : length - off;
+
+            byte[] testWalData = new byte[l];
+            byte[] testCdcData = new byte[l];
+
+            ByteBuffer buf = ByteBuffer.wrap(walSegData);
+            buf.position(off);
+            buf.get(testWalData, 0, l);
+
+            buf = ByteBuffer.wrap(consumer.buf.array());
+            buf.position(off);
+            buf.get(testCdcData, 0, l);
+
+            assertTrue(
+                "Offset " + off + "/" + length + "\n" +
+                "EXPECT " + Arrays.toString(testWalData) + "\n" +
+                "ACTUAL " + Arrays.toString(testCdcData),
+                Arrays.equals(testWalData, testCdcData));
+        }
+    }
+
+    /** */
+    private void checkCdcBufferOverflow(int entrySize, int entryCnt, boolean 
shouldOverflow) throws Exception {
+        LogListener lsnr = LogListener.matches("CDC buffer has overflowed. 
Stop realtime mode of CDC.")
+            .times(shouldOverflow ? 1 : 0)
+            .build();
+
+        lsnrLog.registerListener(lsnr);
+
+        IgniteEx crd = startGrid(0);
+
+        crd.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<Integer, byte[]> cache = crd.cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < entryCnt; i++) {
+            byte[] data = new byte[entrySize];
+
+            Arrays.fill(data, (byte)1);
+
+            cache.put(i, data);
+        }
+
+        forceCheckpoint(crd);
+
+        stopGrid(0);
+
+        assertTrue(lsnr.check());
+    }
+
+    /** Get iterator over WAL. */
+    private WALIterator walIter(File walSegments) throws Exception {
+        IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
+
+        IgniteWalIteratorFactory.IteratorParametersBuilder params = new 
IgniteWalIteratorFactory.IteratorParametersBuilder()
+            .filesOrDirs(walSegments);
+
+        return factory.iterator(params);
+    }
+
+    /** */
+    private static class ByteBufferCdcConsumer implements CdcBufferConsumer {
+        /** */
+        private final ByteBuffer buf;
+
+        /** */
+        ByteBufferCdcConsumer(int maxCdcBufSize) {
+            buf = ByteBuffer.allocate(maxCdcBufSize);
+
+            Arrays.fill(buf.array(), (byte)0);
+
+            buf.position(0);
+        }
+
+        /** */
+        @Override public void consume(ByteBuffer data) {
+            buf.put(data);
+        }
+
+        /** */
+        @Override public void close() {
+            if (stopLatch != null)
+                stopLatch.countDown();
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferWalMmapDisabledTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferWalMmapDisabledTest.java
new file mode 100644
index 00000000000..01247fe964d
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferWalMmapDisabledTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
+
+/** */
+@WithSystemProperty(key = IGNITE_WAL_MMAP, value = "false")
+public class RealtimeCdcBufferWalMmapDisabledTest extends 
RealtimeCdcBufferTest {
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 8b39f3e54ed..746a336da27 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -39,6 +39,10 @@ import 
org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentS
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteRebalanceScheduleResendPartitionsTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.WALPreloadingWithCompactionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.WalPreloadingConcurrentTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferDedicatedWorkerTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferSelfTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferWalMmapDisabledTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.FullHistRebalanceOnClientStopTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
@@ -159,6 +163,12 @@ public class IgnitePdsTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, 
CdcCacheConfigOnRestartTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CdcNonDefaultWorkDirTest.class, 
ignoredTests);
 
+        // Realtime CDC tests.
+        GridTestUtils.addTestIfNeeded(suite, RealtimeCdcBufferSelfTest.class, 
ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, RealtimeCdcBufferTest.class, 
ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
RealtimeCdcBufferDedicatedWorkerTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
RealtimeCdcBufferWalMmapDisabledTest.class, ignoredTests);
+
         // new style folders with generated consistent ID test
         GridTestUtils.addTestIfNeeded(suite, 
IgniteUidAsConsistentIdMigrationTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteWalSerializerVersionTest.class, ignoredTests);

Reply via email to