This is an automated email from the ASF dual-hosted git repository. mpetrov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 2342b19e9a4 IGNITE-20327 Fixed doubled serialization of CQ event oldValue field for REMOVE/EXPIRY events. (#10929) 2342b19e9a4 is described below commit 2342b19e9a4343789b6cd8431d8e97db3aaba733 Author: Mikhail Petrov <32207922+petrov...@users.noreply.github.com> AuthorDate: Fri Sep 15 10:53:57 2023 +0300 IGNITE-20327 Fixed doubled serialization of CQ event oldValue field for REMOVE/EXPIRY events. (#10929) --- ...EntryEvent.java => CacheEntryEventAdapter.java} | 45 +++++++++++---------- .../ignite/cache/query/CacheQueryEntryEvent.java | 3 +- .../thin/ClientCacheEntryListenerHandler.java | 21 ++++------ .../continuous/CacheContinuousQueryEntry.java | 2 +- .../continuous/CacheContinuousQueryEvent.java | 12 +----- .../CacheContinuousQueryEventBuffer.java | 5 ++- .../continuous/CacheContinuousQueryHandler.java | 25 ++++++++++-- .../continuous/CacheContinuousQueryManager.java | 14 ++----- .../cache/ClientCacheEntryEventNotification.java | 5 ++- .../client/thin/CacheEntryListenersTest.java | 47 ++++++++++++++++++++-- ...niteCacheContinuousQueryImmutableEntryTest.java | 4 +- .../ignite/thin/cache/event/cache_entry_event.h | 5 +++ 12 files changed, 117 insertions(+), 71 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheEntryEventAdapter.java similarity index 55% copy from modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java copy to modules/core/src/main/java/org/apache/ignite/cache/query/CacheEntryEventAdapter.java index 3a7994f9b7f..4f9fb4a014a 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheEntryEventAdapter.java @@ -21,28 +21,31 @@ import javax.cache.Cache; import javax.cache.event.CacheEntryEvent; import javax.cache.event.EventType; -/** - * A Cache continuous query entry event. - * - * @param <K> The type of key. - * @param <V> The type of value. - */ -public abstract class CacheQueryEntryEvent<K, V> extends CacheEntryEvent<K, V> { - /** - * Constructs a cache entry event from a given cache as source. - * - * @param src The cache that originated the event. - * @param evtType Event type. - */ - public CacheQueryEntryEvent(Cache src, EventType evtType) { +import static javax.cache.event.EventType.EXPIRED; +import static javax.cache.event.EventType.REMOVED; + +/** */ +public abstract class CacheEntryEventAdapter<K, V> extends CacheEntryEvent<K, V> { + /** */ + protected CacheEntryEventAdapter(Cache src, EventType evtType) { super(src, evtType); } - /** - * Each cache update increases partition counter. The same cache updates have on the same value of counter - * on primary and backup nodes. This value can be useful to communicate with external applications. - * - * @return Value of counter for this event. - */ - public abstract long getPartitionUpdateCounter(); + /** {@inheritDoc} */ + @Override public V getValue() { + EventType evtType = getEventType(); + + return (evtType == EXPIRED || evtType == REMOVED) ? getOldValue() : getNewValue(); + } + + /** {@inheritDoc} */ + @Override public <T> T unwrap(Class<T> cls) { + if (cls.isAssignableFrom(getClass())) + return cls.cast(this); + + throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); + } + + /** */ + protected abstract V getNewValue(); } diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java index 3a7994f9b7f..ccc63c8433a 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java @@ -18,7 +18,6 @@ package org.apache.ignite.cache.query; import javax.cache.Cache; -import javax.cache.event.CacheEntryEvent; import javax.cache.event.EventType; /** @@ -27,7 +26,7 @@ import javax.cache.event.EventType; * @param <K> The type of key. * @param <V> The type of value. */ -public abstract class CacheQueryEntryEvent<K, V> extends CacheEntryEvent<K, V> { +public abstract class CacheQueryEntryEvent<K, V> extends CacheEntryEventAdapter<K, V> { /** * Constructs a cache entry event from a given cache as source. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java index 3f278814bf5..2daba9e2da1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheEntryListenerHandler.java @@ -28,6 +28,7 @@ import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.event.EventType; +import org.apache.ignite.cache.query.CacheEntryEventAdapter; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.client.ClientDisconnectListener; import org.apache.ignite.client.ClientException; @@ -210,7 +211,7 @@ public class ClientCacheEntryListenerHandler<K, V> implements NotificationListen /** * */ - private static class CacheEntryEventImpl<K, V> extends CacheEntryEvent<K, V> { + private static class CacheEntryEventImpl<K, V> extends CacheEntryEventAdapter<K, V> { /** */ private static final long serialVersionUID = 0L; @@ -221,17 +222,17 @@ public class ClientCacheEntryListenerHandler<K, V> implements NotificationListen private final V oldVal; /** Value. */ - private final V val; + private final V newVal; /** * */ - private CacheEntryEventImpl(Cache<K, V> src, EventType evtType, K key, V oldVal, V val) { + private CacheEntryEventImpl(Cache<K, V> src, EventType evtType, K key, V oldVal, V newVal) { super(src, evtType); this.key = key; this.oldVal = oldVal; - this.val = val; + this.newVal = newVal; } /** {@inheritDoc} */ @@ -240,8 +241,8 @@ public class ClientCacheEntryListenerHandler<K, V> implements NotificationListen } /** {@inheritDoc} */ - @Override public V getValue() { - return val; + @Override protected V getNewValue() { + return newVal; } /** {@inheritDoc} */ @@ -253,13 +254,5 @@ public class ClientCacheEntryListenerHandler<K, V> implements NotificationListen @Override public boolean isOldValueAvailable() { return oldVal != null; } - - /** {@inheritDoc} */ - @Override public <T> T unwrap(Class<T> clazz) { - if (clazz.isAssignableFrom(getClass())) - return clazz.cast(this); - - throw new IllegalArgumentException("Unwrapping to class is not supported: " + clazz); - } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 15615a2608d..3eb961c6d7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -352,7 +352,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** * @return New value. */ - CacheObject value() { + CacheObject newValue() { return newVal; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index 58aadb13c28..0632d9cfec4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -69,8 +69,8 @@ class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> { } /** {@inheritDoc} */ - @Override public V getValue() { - return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.value(), e.isKeepBinary(), false, null); + @Override protected V getNewValue() { + return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.newValue(), e.isKeepBinary(), false, null); } /** {@inheritDoc} */ @@ -88,14 +88,6 @@ class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> { return e.updateCounter(); } - /** {@inheritDoc} */ - @Override public <T> T unwrap(Class<T> cls) { - if (cls.isAssignableFrom(getClass())) - return cls.cast(this); - - throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheContinuousQueryEvent.class, this, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index 9a5c1a9841d..783c0b1375b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -409,10 +409,11 @@ public class CacheContinuousQueryEventBuffer { if (e.isFiltered()) filtered++; else { - flushEntry = new CacheContinuousQueryEntry(e.cacheId(), + flushEntry = new CacheContinuousQueryEntry( + e.cacheId(), e.eventType(), e.key(), - e.value(), + e.newValue(), e.oldValue(), e.isKeepBinary(), e.partition(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 16d71f80152..2a506e1e895 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssues; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -84,6 +85,8 @@ import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static javax.cache.event.EventType.EXPIRED; +import static javax.cache.event.EventType.REMOVED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.toCountersMap; @@ -450,7 +453,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler boolean primary, final boolean recordIgniteEvt, GridDhtAtomicAbstractUpdateFuture fut) { - if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) + if (ignoreExpired && evt.getEventType() == EXPIRED) return; if (log.isDebugEnabled()) @@ -1620,11 +1623,27 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler CacheContinuousQueryEvent<? extends K, ? extends V> evt) { Object transVal = transform(trans, evt); + CacheObject cacheObj = transVal == null ? null : cacheContext(ctx).toCacheObject(transVal); + + EventType type = evt.entry().eventType(); + + CacheObject oldValue; + CacheObject newValue; + + if (type == EXPIRED || type == REMOVED) { + newValue = null; + oldValue = cacheObj; + } + else { + newValue = cacheObj; + oldValue = null; + } + return new CacheContinuousQueryEntry(evt.entry().cacheId(), evt.entry().eventType(), null, - transVal == null ? null : cacheContext(ctx).toCacheObject(transVal), - null, + newValue, + oldValue, evt.entry().isKeepBinary(), evt.entry().partition(), evt.entry().updateCounter(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 8fbd4c979ce..e21dc846545 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -434,7 +434,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K cctx.cacheId(), evtType, key, - (!internal && evtType == REMOVED && lsnr.oldValueRequired()) ? oldVal : newVal, + newVal, lsnr.oldValueRequired() ? oldVal : null, lsnr.keepBinary(), partId, @@ -496,7 +496,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K cctx.cacheId(), EXPIRED, key, - lsnr.oldValueRequired() ? oldVal : null, + null, lsnr.oldValueRequired() ? oldVal : null, lsnr.keepBinary(), e.partition(), @@ -1433,18 +1433,10 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K } /** {@inheritDoc} */ - @Override public Object getValue() { + @Override public Object getNewValue() { return val; } - /** {@inheritDoc} */ - @Override public Object unwrap(Class cls) { - if (cls.isAssignableFrom(getClass())) - return cls.cast(this); - - throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheEntryEventImpl.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheEntryEventNotification.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheEntryEventNotification.java index 084689cf4cd..8e9d5676da8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheEntryEventNotification.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheEntryEventNotification.java @@ -22,6 +22,9 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; import org.apache.ignite.internal.processors.platform.client.ClientNotification; +import static javax.cache.event.EventType.EXPIRED; +import static javax.cache.event.EventType.REMOVED; + /** * Continuous query notification. */ @@ -53,7 +56,7 @@ public class ClientCacheEntryEventNotification extends ClientNotification { for (CacheEntryEvent evt : evts) { writer.writeObjectDetached(evt.getKey()); writer.writeObjectDetached(evt.getOldValue()); - writer.writeObjectDetached(evt.getValue()); + writer.writeObjectDetached(evt.getEventType() == EXPIRED || evt.getEventType() == REMOVED ? null : evt.getValue()); switch (evt.getEventType()) { case CREATED: diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java index e654b312e37..12ea362bf84 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java @@ -26,8 +26,8 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.cache.Cache; @@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; @@ -138,6 +139,32 @@ public class CacheEntryListenersTest extends AbstractThinClientTest { } } + /** */ + @Test + public void testEventReceivedData() throws Exception { + try (IgniteClient client = startClient(0, 1, 2)) { + ClientCache<Integer, Integer> cache = client.getOrCreateCache("testEventReceivedData"); + + ContinuousQueryListener<Integer, Integer> lsnr = new ContinuousQueryListener<>(); + + cache.query(new ContinuousQuery<Integer, Integer>().setLocalListener(lsnr).setIncludeExpired(true)); + + cache.put(0, 0); + + lsnr.assertNextCacheEvent(EventType.CREATED, evt -> assertNull(evt.getOldValue())); + + cache.remove(0); + + lsnr.assertNextCacheEvent(EventType.REMOVED, evt -> assertSame(evt.getValue(), evt.getOldValue())); + + cache.withExpirePolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100))).put(1, 1); + + lsnr.assertNextCacheEvent(EventType.CREATED, evt -> assertNull(evt.getOldValue())); + + lsnr.assertNextCacheEvent(EventType.EXPIRED, evt -> assertSame(evt.getValue(), evt.getOldValue())); + } + } + /** Test continuous queries with initial query. */ @Test public void testContinuousQueriesWithInitialQuery() throws Exception { @@ -188,7 +215,7 @@ public class CacheEntryListenersTest extends AbstractThinClientTest { cache.query(qry1); cache.query(qry2); - cache = cache.withExpirePolicy(new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 1))); + cache = cache.withExpirePolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1))); for (int i = 0; i < 100; i++) cache.put(i, i); @@ -330,7 +357,7 @@ public class CacheEntryListenersTest extends AbstractThinClientTest { cache.registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>( () -> lsnr, null, true, false)); - cache = cache.withExpirePolicy(new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 1))); + cache = cache.withExpirePolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1))); for (int i = 0; i < 100; i++) cache.put(i, i); @@ -738,7 +765,7 @@ public class CacheEntryListenersTest extends AbstractThinClientTest { if (failure != null) throw failure; - CacheEntryEvent<? extends K, ? extends V> evt = evtsQ.poll(timeout, TimeUnit.MILLISECONDS); + CacheEntryEvent<? extends K, ? extends V> evt = evtsQ.poll(timeout, MILLISECONDS); assertNotNull(evt); @@ -771,6 +798,18 @@ public class CacheEntryListenersTest extends AbstractThinClientTest { assertEquals(expVal, evt.getValue()); } + /** */ + public void assertNextCacheEvent( + EventType expType, + Consumer<CacheEntryEvent<? extends K, ? extends V>> checker + ) throws Exception { + CacheEntryEvent<? extends K, ? extends V> evt = poll(); + + assertEquals(expType, evt.getEventType()); + + checker.accept(evt); + } + /** */ public boolean isDisconnected() { return disconnected; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java index 52cbafa3efb..79f65a6740d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java @@ -172,8 +172,8 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst assertNotNull(e0.key()); assertNull(e1.oldValue()); assertNotNull(e0.oldValue()); - assertNull(e1.value()); - assertNotNull(e0.value()); + assertNull(e1.newValue()); + assertNotNull(e0.newValue()); } /** diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event.h b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event.h index 57f69b2e518..c246470eff0 100644 --- a/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event.h +++ b/modules/platforms/cpp/thin-client/include/ignite/thin/cache/event/cache_entry_event.h @@ -203,6 +203,11 @@ namespace ignite int8_t eventTypeByte = reader.ReadInt8(); this->eventType = CacheEntryEventType::FromInt8(eventTypeByte); + + if ((eventType == CacheEntryEventType::EXPIRED || eventType == CacheEntryEventType::REMOVED) && hasOldValue) { + this->hasValue = true; + this->val = oldVal; + } } /** Old value. */