This is an automated email from the ASF dual-hosted git repository.

shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c4d0a8dbae IGNITE-28478 Refactor GridContinuousProcessor (#13065)
3c4d0a8dbae is described below

commit 3c4d0a8dbae69a90a73d515678d29cb9ac834b65
Author: Aleksandr Chesnokov <[email protected]>
AuthorDate: Wed Apr 29 17:40:22 2026 +0300

    IGNITE-28478 Refactor GridContinuousProcessor (#13065)
---
 .../continuous/CacheContinuousQueryHandler.java    | 197 +++++++++++++++++++--
 .../continuous/CacheContinuousQueryHandlerV2.java  | 184 -------------------
 .../continuous/CacheContinuousQueryHandlerV3.java  | 188 --------------------
 .../continuous/CacheContinuousQueryManager.java    |   6 +-
 .../continuous/GridContinuousProcessor.java        |  65 ++++---
 .../main/resources/META-INF/classnames.properties  |   2 -
 6 files changed, 218 insertions(+), 424 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 2c5ce3ba634..200a40cb796 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryListener;
@@ -159,6 +160,30 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     /** Deployable object for filter. */
     private CacheContinuousQueryDeployableObject rmtFilterDep;
 
+    /** Remote filter factory. */
+    private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
+
+    /** Deployable object for filter factory. */
+    private CacheContinuousQueryDeployableObject rmtFilterFactoryDep;
+
+    /** Remote filter created by {@link #rmtFilterFactory}. */
+    private transient CacheEntryEventFilter rmtFilterFromFactory;
+
+    /** Event types for JCache API. */
+    private byte types;
+
+    /** Remote transformer factory. */
+    private Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? 
extends V>, ?>> rmtTransFactory;
+
+    /** Deployable object for transformer factory. */
+    private CacheContinuousQueryDeployableObject rmtTransFactoryDep;
+
+    /** Remote transformer created by {@link #rmtTransFactory}. */
+    private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, 
?> rmtTrans;
+
+    /** Local listener for transformed events. */
+    private transient EventListener<?> locTransLsnr;
+
     /** Internal flag. */
     private boolean internal;
 
@@ -248,6 +273,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
      * @param oldValRequired Old value required flag.
      * @param sync Synchronous flag.
      * @param ignoreExpired Ignore expired events flag.
+     * @param ignoreClsNotFound Ignore class not found flag.
      */
     public CacheContinuousQueryHandler(
         String cacheName,
@@ -272,6 +298,82 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         cacheId = CU.cacheId(cacheName);
     }
 
+    /**
+     * @param cacheName Cache name.
+     * @param topic Topic for ordered messages.
+     * @param locLsnr Local listener.
+     * @param rmtFilterFactory Remote filter factory.
+     * @param oldValRequired Old value required flag.
+     * @param sync Synchronous flag.
+     * @param ignoreExpired Ignore expired events flag.
+     * @param ignoreClsNotFound Ignore class not found flag.
+     * @param types Event types for JCache API, {@code null} for non-JCache 
queries.
+     */
+    public CacheContinuousQueryHandler(
+        String cacheName,
+        Object topic,
+        @Nullable CacheEntryUpdatedListener<K, V> locLsnr,
+        @Nullable Factory<? extends CacheEntryEventFilter<K, V>> 
rmtFilterFactory,
+        boolean oldValRequired,
+        boolean sync,
+        boolean ignoreExpired,
+        boolean ignoreClsNotFound,
+        @Nullable Byte types) {
+        this(cacheName,
+            topic,
+            locLsnr,
+            null,
+            oldValRequired,
+            sync,
+            ignoreExpired,
+            ignoreClsNotFound);
+
+        this.rmtFilterFactory = rmtFilterFactory;
+
+        if (types != null) {
+            assert types != 0;
+
+            this.types = types;
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param topic Topic.
+     * @param locTransLsnr Local listener of transformed events
+     * @param rmtFilterFactory Remote filter factory.
+     * @param rmtTransFactory Remote transformer factory.
+     * @param oldValRequired OldValRequired flag.
+     * @param sync Sync flag.
+     * @param ignoreExpired IgnoreExpired flag.
+     * @param ignoreClsNotFound IgnoreClassNotFoundException flag.
+     */
+    public CacheContinuousQueryHandler(
+        String cacheName,
+        Object topic,
+        EventListener<?> locTransLsnr,
+        @Nullable Factory<? extends CacheEntryEventFilter<K, V>> 
rmtFilterFactory,
+        Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends 
V>, ?>> rmtTransFactory,
+        boolean oldValRequired,
+        boolean sync,
+        boolean ignoreExpired,
+        boolean ignoreClsNotFound) {
+        this(cacheName,
+            topic,
+            null,
+            rmtFilterFactory,
+            oldValRequired,
+            sync,
+            ignoreExpired,
+            ignoreClsNotFound,
+            null);
+
+        assert rmtTransFactory != null;
+
+        this.locTransLsnr = locTransLsnr;
+        this.rmtTransFactory = rmtTransFactory;
+    }
+
     /**
      * @param internal Internal query.
      */
@@ -379,6 +481,12 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         assert routineId != null;
         assert ctx != null;
 
+        if (locTransLsnr != null) {
+            ctx.resource().injectGeneric(locTransLsnr);
+
+            asyncCb = U.hasAnnotation(locTransLsnr, IgniteAsyncCallback.class);
+        }
+
         initLocalListener(locLsnr, ctx);
 
         if (initFut == null) {
@@ -772,6 +880,17 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
      * @return Cache entry event filter.
      */
     protected CacheEntryEventFilter getEventFilter0() {
+        if (rmtFilterFactory != null) {
+            if (rmtFilterFromFactory == null) {
+                rmtFilterFromFactory = rmtFilterFactory.create();
+
+                if (types != 0)
+                    rmtFilterFromFactory = new 
JCacheQueryRemoteFilter(rmtFilterFromFactory, types);
+            }
+
+            return rmtFilterFromFactory;
+        }
+
         return rmtFilter;
     }
 
@@ -788,14 +907,17 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
      * @return Cache entry event transformer.
      */
     public IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> 
getTransformer0() {
-        return null;
+        if (rmtTrans == null && rmtTransFactory != null)
+            rmtTrans = rmtTransFactory.create();
+
+        return rmtTrans;
     }
 
     /**
      * @return Local listener of transformed events.
      */
     @Nullable public EventListener<?> localTransformedEventListener() {
-        return null;
+        return locTransLsnr;
     }
 
     /**
@@ -1265,8 +1387,14 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
         assert ctx != null;
         assert ctx.config().isPeerClassLoadingEnabled();
 
-        if (rmtFilter != null && !U.isGrid(rmtFilter.getClass()))
+        if (requiresDeployment(rmtFilter))
             rmtFilterDep = new CacheContinuousQueryDeployableObject(rmtFilter, 
ctx);
+
+        if (requiresDeployment(rmtFilterFactory))
+            rmtFilterFactoryDep = new 
CacheContinuousQueryDeployableObject(rmtFilterFactory, ctx);
+
+        if (requiresDeployment(rmtTransFactory))
+            rmtTransFactoryDep = new 
CacheContinuousQueryDeployableObject(rmtTransFactory, ctx);
     }
 
     /** {@inheritDoc} */
@@ -1278,6 +1406,12 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
         if (rmtFilterDep != null)
             rmtFilter = p2pUnmarshal(rmtFilterDep, nodeId, ctx);
 
+        if (rmtFilterFactoryDep != null)
+            rmtFilterFactory = p2pUnmarshal(rmtFilterFactoryDep, nodeId, ctx);
+
+        if (rmtTransFactoryDep != null)
+            rmtTransFactory = p2pUnmarshal(rmtTransFactoryDep, nodeId, ctx);
+
         if (!p2pUnmarshalFut.isDone())
             ((GridFutureAdapter)p2pUnmarshalFut).onDone();
     }
@@ -1286,7 +1420,9 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
      * @return Whether the handler is marshalled for peer class loading.
      */
     public boolean isMarshalled() {
-        return rmtFilter == null || U.isGrid(rmtFilter.getClass()) || 
rmtFilterDep != null;
+        return (!requiresDeployment(rmtFilter) || rmtFilterDep != null)
+            && (!requiresDeployment(rmtFilterFactory) || rmtFilterFactoryDep 
!= null)
+            && (!requiresDeployment(rmtTransFactory) || rmtTransFactoryDep != 
null);
     }
 
     /**
@@ -1418,14 +1554,7 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
         U.writeString(out, cacheName);
         out.writeObject(topic);
 
-        boolean b = rmtFilterDep != null;
-
-        out.writeBoolean(b);
-
-        if (b)
-            out.writeObject(rmtFilterDep);
-        else
-            out.writeObject(rmtFilter);
+        writeDeployable(out, rmtFilter, rmtFilterDep);
 
         out.writeBoolean(internal);
         out.writeBoolean(notifyExisting);
@@ -1434,6 +1563,12 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
         out.writeBoolean(ignoreExpired);
         out.writeInt(taskHash);
         out.writeBoolean(keepBinary);
+
+        writeDeployable(out, rmtFilterFactory, rmtFilterFactoryDep);
+
+        out.writeByte(types);
+
+        writeDeployable(out, rmtTransFactory, rmtTransFactoryDep);
     }
 
     /** {@inheritDoc} */
@@ -1459,9 +1594,42 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
         taskHash = in.readInt();
         keepBinary = in.readBoolean();
 
+        b = in.readBoolean();
+
+        if (b) {
+            rmtFilterFactoryDep = 
(CacheContinuousQueryDeployableObject)in.readObject();
+
+            if (p2pUnmarshalFut.isDone())
+                p2pUnmarshalFut = new GridFutureAdapter<>();
+        }
+        else
+            rmtFilterFactory = (Factory)in.readObject();
+
+        types = in.readByte();
+
+        b = in.readBoolean();
+
+        if (b) {
+            rmtTransFactoryDep = 
(CacheContinuousQueryDeployableObject)in.readObject();
+
+            if (p2pUnmarshalFut.isDone())
+                p2pUnmarshalFut = new GridFutureAdapter<>();
+        }
+        else
+            rmtTransFactory = (Factory<? extends 
IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>>)in.readObject();
+
         cacheId = CU.cacheId(cacheName);
     }
 
+    /** */
+    private static void writeDeployable(ObjectOutput out, Object obj, 
CacheContinuousQueryDeployableObject dep) throws IOException {
+        boolean b = dep != null;
+
+        out.writeBoolean(b);
+
+        out.writeObject(b ? dep : obj);
+    }
+
     /**
      * @param ctx Kernal context.
      * @return Cache context.
@@ -1639,4 +1807,9 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     Map<Integer, CacheContinuousQueryEventBuffer> 
partitionContinuesQueryEntryBuffers() {
         return Collections.unmodifiableMap(entryBufs);
     }
+
+    /** */
+    private static boolean requiresDeployment(@Nullable Object obj) {
+        return obj != null && !U.isGrid(obj.getClass());
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
deleted file mode 100644
index 06ad511207d..00000000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.UUID;
-import javax.cache.configuration.Factory;
-import javax.cache.event.CacheEntryEventFilter;
-import javax.cache.event.CacheEntryUpdatedListener;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridKernalContext;
-import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
-import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Continuous query handler V2 version. Contains {@link Factory} for remote 
listener.
- */
-public class CacheContinuousQueryHandlerV2<K, V> extends 
CacheContinuousQueryHandler<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Remote filter factory. */
-    Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
-
-    /** Deployable object for filter factory. */
-    private CacheContinuousQueryDeployableObject rmtFilterFactoryDep;
-
-    /** Event types for JCache API. */
-    private byte types;
-
-    /** */
-    protected transient CacheEntryEventFilter filter;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public CacheContinuousQueryHandlerV2() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param cacheName Cache name.
-     * @param topic Topic for ordered messages.
-     * @param locLsnr Local listener.
-     * @param rmtFilterFactory Remote filter factory.
-     * @param oldValRequired Old value required flag.
-     * @param sync Synchronous flag.
-     * @param ignoreExpired Ignore expired events flag.
-     * @param types Event types.
-     */
-    public CacheContinuousQueryHandlerV2(
-        String cacheName,
-        Object topic,
-        @Nullable CacheEntryUpdatedListener<K, V> locLsnr,
-        @Nullable Factory<? extends CacheEntryEventFilter<K, V>> 
rmtFilterFactory,
-        boolean oldValRequired,
-        boolean sync,
-        boolean ignoreExpired,
-        boolean ignoreClsNotFound,
-        @Nullable Byte types) {
-        super(cacheName,
-            topic,
-            locLsnr,
-            null,
-            oldValRequired,
-            sync,
-            ignoreExpired,
-            ignoreClsNotFound);
-        this.rmtFilterFactory = rmtFilterFactory;
-
-        if (types != null) {
-            assert types != 0;
-
-            this.types = types;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheEntryEventFilter getEventFilter0() {
-        if (filter == null) {
-            assert rmtFilterFactory != null;
-
-            Factory<? extends CacheEntryEventFilter> factory = 
rmtFilterFactory;
-
-            filter = factory.create();
-
-            if (types != 0)
-                filter = new JCacheQueryRemoteFilter(filter, types);
-        }
-
-        return filter;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void p2pMarshal(GridKernalContext ctx) throws 
IgniteCheckedException {
-        super.p2pMarshal(ctx);
-
-        if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass()))
-            rmtFilterFactoryDep = new 
CacheContinuousQueryDeployableObject(rmtFilterFactory, ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) 
throws IgniteCheckedException {
-        if (rmtFilterFactoryDep != null)
-            rmtFilterFactory = p2pUnmarshal(rmtFilterFactoryDep, nodeId, ctx);
-
-        super.p2pUnmarshal(nodeId, ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isMarshalled() {
-        return super.isMarshalled() &&
-            (rmtFilterFactory == null || U.isGrid(rmtFilterFactory.getClass()) 
|| rmtFilterFactoryDep != null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridContinuousHandler clone() {
-        return super.clone();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CacheContinuousQueryHandlerV2.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        super.writeExternal(out);
-
-        boolean b = rmtFilterFactoryDep != null;
-
-        out.writeBoolean(b);
-
-        if (b)
-            out.writeObject(rmtFilterFactoryDep);
-        else
-            out.writeObject(rmtFilterFactory);
-
-        out.writeByte(types);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        super.readExternal(in);
-
-        boolean b = in.readBoolean();
-
-        if (b) {
-            rmtFilterFactoryDep = 
(CacheContinuousQueryDeployableObject)in.readObject();
-
-            if (p2pUnmarshalFut.isDone())
-                p2pUnmarshalFut = new GridFutureAdapter<>();
-        }
-        else
-            rmtFilterFactory = (Factory)in.readObject();
-
-        types = in.readByte();
-    }
-}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
deleted file mode 100644
index 451081313a1..00000000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.UUID;
-import javax.cache.configuration.Factory;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryEventFilter;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
-import 
org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteAsyncCallback;
-import org.apache.ignite.lang.IgniteClosure;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Continuous query handler V3 version.
- * Contains {@link Factory} for remote transformer and {@link EventListener}.
- *
- * @see ContinuousQueryWithTransformer
- */
-public class CacheContinuousQueryHandlerV3<K, V> extends 
CacheContinuousQueryHandlerV2<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Remote transformer. */
-    private Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? 
extends V>, ?>> rmtTransFactory;
-
-    /** Deployable object for transformer. */
-    private CacheContinuousQueryDeployableObject rmtTransFactoryDep;
-
-    /** Remote transformer. */
-    private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, 
?> rmtTrans;
-
-    /** Local listener for transformed events. */
-    private transient EventListener<?> locTransLsnr;
-
-    /**
-     * Empty constructor.
-     */
-    public CacheContinuousQueryHandlerV3() {
-        super();
-    }
-
-    /**
-     * @param cacheName Cache name.
-     * @param topic Topic.
-     * @param locTransLsnr Local listener of transformed events
-     * @param rmtFilterFactory Remote filter factory.
-     * @param rmtTransFactory Remote transformer factory.
-     * @param oldValRequired OldValRequired flag.
-     * @param sync Sync flag.
-     * @param ignoreExpired IgnoreExpired flag.
-     * @param ignoreClsNotFound IgnoreClassNotFoundException flag.
-     */
-    public CacheContinuousQueryHandlerV3(
-        String cacheName,
-        Object topic,
-        EventListener<?> locTransLsnr,
-        @Nullable Factory<? extends CacheEntryEventFilter<K, V>> 
rmtFilterFactory,
-        Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends 
V>, ?>> rmtTransFactory,
-        boolean oldValRequired,
-        boolean sync,
-        boolean ignoreExpired,
-        boolean ignoreClsNotFound) {
-        super(
-            cacheName,
-            topic,
-            null,
-            rmtFilterFactory,
-            oldValRequired,
-            sync,
-            ignoreExpired,
-            ignoreClsNotFound,
-            null);
-
-        assert rmtTransFactory != null;
-
-        this.locTransLsnr = locTransLsnr;
-        this.rmtTransFactory = rmtTransFactory;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, 
?> getTransformer0() {
-        if (rmtTrans == null && rmtTransFactory != null)
-            rmtTrans = rmtTransFactory.create();
-
-        return rmtTrans;
-    }
-
-    /** {@inheritDoc} */
-    @Override public EventListener<?> localTransformedEventListener() {
-        return locTransLsnr;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheEntryEventFilter getEventFilter0() {
-        if (rmtFilterFactory == null)
-            return null;
-
-        return super.getEventFilter0();
-    }
-
-    /** {@inheritDoc} */
-    @Override public RegisterStatus register(UUID nodeId, UUID routineId,
-        GridKernalContext ctx) throws IgniteCheckedException {
-        if (locTransLsnr != null) {
-            ctx.resource().injectGeneric(locTransLsnr);
-
-            asyncCb = U.hasAnnotation(locTransLsnr, IgniteAsyncCallback.class);
-        }
-
-        return super.register(nodeId, routineId, ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void p2pMarshal(GridKernalContext ctx) throws 
IgniteCheckedException {
-        super.p2pMarshal(ctx);
-
-        if (rmtTransFactory != null && !U.isGrid(rmtTransFactory.getClass()))
-            rmtTransFactoryDep = new 
CacheContinuousQueryDeployableObject(rmtTransFactory, ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) 
throws IgniteCheckedException {
-        if (rmtTransFactoryDep != null)
-            rmtTransFactory = p2pUnmarshal(rmtTransFactoryDep, nodeId, ctx);
-
-        super.p2pUnmarshal(nodeId, ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isMarshalled() {
-        return super.isMarshalled() &&
-            (rmtTransFactory == null || U.isGrid(rmtTransFactory.getClass()) 
|| rmtTransFactoryDep != null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        super.writeExternal(out);
-
-        boolean b = rmtTransFactoryDep != null;
-
-        out.writeBoolean(b);
-
-        if (b)
-            out.writeObject(rmtTransFactoryDep);
-        else
-            out.writeObject(rmtTransFactory);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        super.readExternal(in);
-
-        boolean b = in.readBoolean();
-
-        if (b) {
-            rmtTransFactoryDep = 
(CacheContinuousQueryDeployableObject)in.readObject();
-
-            if (p2pUnmarshalFut.isDone())
-                p2pUnmarshalFut = new GridFutureAdapter<>();
-        }
-        else
-            rmtTransFactory = (Factory<? extends 
IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>>)in.readObject();
-    }
-}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 922e4203af3..ed362129e52 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -529,7 +529,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
         if (rmtTransFactory != null) {
             clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
                 @Override public CacheContinuousQueryHandler apply() {
-                    return new CacheContinuousQueryHandlerV3(
+                    return new CacheContinuousQueryHandler(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), 
seq.getAndIncrement()),
                         locTransLsnr,
@@ -545,7 +545,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
         else if (rmtFilterFactory != null) {
             clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
                 @Override public CacheContinuousQueryHandler apply() {
-                    return new CacheContinuousQueryHandlerV2(
+                    return new CacheContinuousQueryHandler(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), 
seq.getAndIncrement()),
                         locLsnr,
@@ -1109,7 +1109,7 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
                         Factory<CacheEntryEventFilter<K, V>> rmtFilterFactory 
= cfg.getCacheEntryEventFilterFactory();
 
                         if (rmtFilterFactory != null)
-                            hnd = new CacheContinuousQueryHandlerV2(
+                            hnd = new CacheContinuousQueryHandler(
                                 cctx.name(),
                                 TOPIC_CACHE.topic(topicPrefix, 
cctx.localNodeId(), seq.getAndIncrement()),
                                 locLsnr,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1786bdf4003..7509a88fa17 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -171,8 +171,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /** */
     private ContinuousRoutinesInfo routinesInfo;
 
-    /** */
-    private int discoProtoVer;
+    /** Whether Discovery SPI uses immutable custom messages. */
+    private boolean immutableDiscoCustomMsg;
 
     /**
      * @param ctx Kernal context.
@@ -188,9 +188,9 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             new ReadOnlyCollectionView2X<>(rmtInfos.entrySet(), 
locInfos.entrySet()),
             e -> new ContinuousQueryView(e.getKey(), e.getValue()));
 
-        discoProtoVer = ctx.discovery().mutableCustomMessages() ? 1 : 2;
+        immutableDiscoCustomMsg = !ctx.discovery().mutableCustomMessages();
 
-        if (discoProtoVer == 2)
+        if (immutableDiscoCustomMsg)
             routinesInfo = new ContinuousRoutinesInfo();
 
         retryDelay = ctx.config().getNetworkSendRetryDelay();
@@ -211,12 +211,12 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion 
topVer,
                     ClusterNode snd,
                     StartRoutineDiscoveryMessage msg) {
-                    assert discoProtoVer == 1 : discoProtoVer;
+                    assert !immutableDiscoCustomMsg;
 
                     if (ctx.isStopping())
                         return;
 
-                    processStartRequest(snd, msg);
+                    processStartRequestMutable(snd, msg);
                 }
             });
 
@@ -225,12 +225,12 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion 
topVer,
                     ClusterNode snd,
                     StartRoutineDiscoveryMessageV2 msg) {
-                    assert discoProtoVer == 2 : discoProtoVer;
+                    assert immutableDiscoCustomMsg;
 
                     if (ctx.isStopping())
                         return;
 
-                    processStartRequestV2(topVer, snd, msg);
+                    processStartRequestImmutable(topVer, snd, msg);
                 }
             });
 
@@ -251,7 +251,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion 
topVer,
                     ClusterNode snd,
                     StopRoutineDiscoveryMessage msg) {
-                    if (discoProtoVer == 2)
+                    if (immutableDiscoCustomMsg)
                         routinesInfo.removeRoutine(msg.routineId);
 
                     if (ctx.isStopping())
@@ -408,7 +408,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
-        if (discoProtoVer == 2) {
+        if (immutableDiscoCustomMsg) {
             routinesInfo.collectJoiningNodeData(dataBag);
 
             return;
@@ -422,7 +422,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
-        if (discoProtoVer == 2) {
+        if (immutableDiscoCustomMsg) {
             routinesInfo.collectGridNodeData(dataBag);
 
             return;
@@ -519,7 +519,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 ']');
         }
 
-        if (discoProtoVer == 2) {
+        if (immutableDiscoCustomMsg) {
             if (data.hasJoiningNodeData()) {
                 ContinuousRoutinesJoiningNodeDiscoveryData nodeData = 
(ContinuousRoutinesJoiningNodeDiscoveryData)
                     data.joiningNodeData();
@@ -527,19 +527,19 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 for (ContinuousRoutineInfo routineInfo : 
nodeData.startedRoutines) {
                     routinesInfo.addRoutineInfo(routineInfo);
 
-                    onDiscoveryDataReceivedV2(routineInfo);
+                    onDiscoveryDataReceivedImmutable(routineInfo);
                 }
             }
         }
         else {
             if (data.hasJoiningNodeData())
-                
onDiscoveryDataReceivedV1((DiscoveryData)data.joiningNodeData());
+                
onDiscoveryDataReceivedMutable((DiscoveryData)data.joiningNodeData());
         }
     }
 
     /** {@inheritDoc} */
     @Override public void onGridDataReceived(GridDiscoveryData data) {
-        if (discoProtoVer == 2) {
+        if (immutableDiscoCustomMsg) {
             if (data.commonData() != null) {
                 ContinuousRoutinesCommonDiscoveryData commonData =
                     (ContinuousRoutinesCommonDiscoveryData)data.commonData();
@@ -550,7 +550,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
                     routinesInfo.addRoutineInfo(routineInfo);
 
-                    onDiscoveryDataReceivedV2(routineInfo);
+                    onDiscoveryDataReceivedImmutable(routineInfo);
                 }
             }
         }
@@ -559,18 +559,18 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
             if (nodeSpecData != null) {
                 for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet())
-                    onDiscoveryDataReceivedV1((DiscoveryData)e.getValue());
+                    
onDiscoveryDataReceivedMutable((DiscoveryData)e.getValue());
             }
         }
     }
 
     /**
      * Processes data received in a discovery message.
-     * Used with protocol version 1.
+     * Used when Discovery SPI supports mutable custom messages.
      *
      * @param data received discovery data.
      */
-    private void onDiscoveryDataReceivedV1(DiscoveryData data) {
+    private void onDiscoveryDataReceivedMutable(DiscoveryData data) {
         if (data != null) {
             for (DiscoveryDataItem item : data.items) {
                 if (!locInfos.containsKey(item.routineId)) {
@@ -610,11 +610,11 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
     /**
      * Processes data received in a discovery message.
-     * Used with protocol version 2.
+     * Used when Discovery SPI doesn't support mutable custom messages.
      *
      * @param routineInfo Routine info.
      */
-    private void onDiscoveryDataReceivedV2(ContinuousRoutineInfo routineInfo) {
+    private void onDiscoveryDataReceivedImmutable(ContinuousRoutineInfo 
routineInfo) {
         IgnitePredicate<ClusterNode> nodeFilter;
 
         try {
@@ -792,7 +792,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         LocalRoutineInfo routineInfo = new LocalRoutineInfo(ctx.localNodeId(), 
prjPred, hnd, 1, 0, true);
 
-        if (discoProtoVer == 2) {
+        if (immutableDiscoCustomMsg) {
             routinesInfo.addRoutineInfo(createRoutineInfo(
                 ctx.localNodeId(),
                 routineId,
@@ -991,7 +991,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         reqData.prepareMarshal(ctx);
 
-        if (discoProtoVer == 1) {
+        if (!immutableDiscoCustomMsg) {
             StartRoutineDiscoveryMessage msg = new 
StartRoutineDiscoveryMessage(
                     routineId,
                     reqData);
@@ -1001,13 +1001,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
             return msg;
         }
-        else {
-            assert discoProtoVer == 2 : discoProtoVer;
-
-            return new StartRoutineDiscoveryMessageV2(
-                routineId,
-                reqData);
-        }
+        else
+            return new StartRoutineDiscoveryMessageV2(routineId, reqData);
     }
 
     /**
@@ -1077,7 +1072,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                     unregisterHandler(routineId, routine.hnd, true);
                 }
 
-                if (!stop && discoProtoVer == 2)
+                if (!stop && immutableDiscoCustomMsg)
                     stop = routinesInfo.routineExists(routineId);
 
                 // Finish if routine is not found (wrong ID is provided).
@@ -1265,7 +1260,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         clientInfos.clear();
 
-        if (discoProtoVer == 2)
+        if (immutableDiscoCustomMsg)
             routinesInfo.onClientDisconnected(locInfos.keySet());
 
         if (log.isDebugEnabled()) {
@@ -1352,7 +1347,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @param node Sender.
      * @param req Start request.
      */
-    private void processStartRequest(ClusterNode node, 
StartRoutineDiscoveryMessage req) {
+    private void processStartRequestMutable(ClusterNode node, 
StartRoutineDiscoveryMessage req) {
         if (node.id().equals(ctx.localNodeId()))
             return;
 
@@ -1471,7 +1466,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @param snd Sender.
      * @param msg Start request.
      */
-    private void processStartRequestV2(final AffinityTopologyVersion topVer,
+    private void processStartRequestImmutable(final AffinityTopologyVersion 
topVer,
         final ClusterNode snd,
         final StartRoutineDiscoveryMessageV2 msg) {
         StartRequestData reqData = msg.startRequestData();
@@ -1913,7 +1908,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
             UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
-            if (discoProtoVer == 2) {
+            if (immutableDiscoCustomMsg) {
                 routinesInfo.onNodeFail(nodeId);
 
                 for (StartFuture fut : startFuts.values())
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index 370d18525b1..954755b375a 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1387,8 +1387,6 @@ 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQuer
 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler
 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$1
 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$ContinuousQueryAsyncClosure$1
-org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV2
-org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV3
 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$1
 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$2
 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$3


Reply via email to