This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3c4d0a8dbae IGNITE-28478 Refactor GridContinuousProcessor (#13065)
3c4d0a8dbae is described below
commit 3c4d0a8dbae69a90a73d515678d29cb9ac834b65
Author: Aleksandr Chesnokov <[email protected]>
AuthorDate: Wed Apr 29 17:40:22 2026 +0300
IGNITE-28478 Refactor GridContinuousProcessor (#13065)
---
.../continuous/CacheContinuousQueryHandler.java | 197 +++++++++++++++++++--
.../continuous/CacheContinuousQueryHandlerV2.java | 184 -------------------
.../continuous/CacheContinuousQueryHandlerV3.java | 188 --------------------
.../continuous/CacheContinuousQueryManager.java | 6 +-
.../continuous/GridContinuousProcessor.java | 65 ++++---
.../main/resources/META-INF/classnames.properties | 2 -
6 files changed, 218 insertions(+), 424 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 2c5ce3ba634..200a40cb796 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListener;
@@ -159,6 +160,30 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
/** Deployable object for filter. */
private CacheContinuousQueryDeployableObject rmtFilterDep;
+ /** Remote filter factory. */
+ private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
+
+ /** Deployable object for filter factory. */
+ private CacheContinuousQueryDeployableObject rmtFilterFactoryDep;
+
+ /** Remote filter created by {@link #rmtFilterFactory}. */
+ private transient CacheEntryEventFilter rmtFilterFromFactory;
+
+ /** Event types for JCache API. */
+ private byte types;
+
+ /** Remote transformer factory. */
+ private Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ?
extends V>, ?>> rmtTransFactory;
+
+ /** Deployable object for transformer factory. */
+ private CacheContinuousQueryDeployableObject rmtTransFactoryDep;
+
+ /** Remote transformer created by {@link #rmtTransFactory}. */
+ private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>,
?> rmtTrans;
+
+ /** Local listener for transformed events. */
+ private transient EventListener<?> locTransLsnr;
+
/** Internal flag. */
private boolean internal;
@@ -248,6 +273,7 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
* @param oldValRequired Old value required flag.
* @param sync Synchronous flag.
* @param ignoreExpired Ignore expired events flag.
+ * @param ignoreClsNotFound Ignore class not found flag.
*/
public CacheContinuousQueryHandler(
String cacheName,
@@ -272,6 +298,82 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
cacheId = CU.cacheId(cacheName);
}
+ /**
+ * @param cacheName Cache name.
+ * @param topic Topic for ordered messages.
+ * @param locLsnr Local listener.
+ * @param rmtFilterFactory Remote filter factory.
+ * @param oldValRequired Old value required flag.
+ * @param sync Synchronous flag.
+ * @param ignoreExpired Ignore expired events flag.
+ * @param ignoreClsNotFound Ignore class not found flag.
+ * @param types Event types for JCache API, {@code null} for non-JCache
queries.
+ */
+ public CacheContinuousQueryHandler(
+ String cacheName,
+ Object topic,
+ @Nullable CacheEntryUpdatedListener<K, V> locLsnr,
+ @Nullable Factory<? extends CacheEntryEventFilter<K, V>>
rmtFilterFactory,
+ boolean oldValRequired,
+ boolean sync,
+ boolean ignoreExpired,
+ boolean ignoreClsNotFound,
+ @Nullable Byte types) {
+ this(cacheName,
+ topic,
+ locLsnr,
+ null,
+ oldValRequired,
+ sync,
+ ignoreExpired,
+ ignoreClsNotFound);
+
+ this.rmtFilterFactory = rmtFilterFactory;
+
+ if (types != null) {
+ assert types != 0;
+
+ this.types = types;
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param topic Topic.
+ * @param locTransLsnr Local listener of transformed events
+ * @param rmtFilterFactory Remote filter factory.
+ * @param rmtTransFactory Remote transformer factory.
+ * @param oldValRequired OldValRequired flag.
+ * @param sync Sync flag.
+ * @param ignoreExpired IgnoreExpired flag.
+ * @param ignoreClsNotFound IgnoreClassNotFoundException flag.
+ */
+ public CacheContinuousQueryHandler(
+ String cacheName,
+ Object topic,
+ EventListener<?> locTransLsnr,
+ @Nullable Factory<? extends CacheEntryEventFilter<K, V>>
rmtFilterFactory,
+ Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends
V>, ?>> rmtTransFactory,
+ boolean oldValRequired,
+ boolean sync,
+ boolean ignoreExpired,
+ boolean ignoreClsNotFound) {
+ this(cacheName,
+ topic,
+ null,
+ rmtFilterFactory,
+ oldValRequired,
+ sync,
+ ignoreExpired,
+ ignoreClsNotFound,
+ null);
+
+ assert rmtTransFactory != null;
+
+ this.locTransLsnr = locTransLsnr;
+ this.rmtTransFactory = rmtTransFactory;
+ }
+
/**
* @param internal Internal query.
*/
@@ -379,6 +481,12 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
assert routineId != null;
assert ctx != null;
+ if (locTransLsnr != null) {
+ ctx.resource().injectGeneric(locTransLsnr);
+
+ asyncCb = U.hasAnnotation(locTransLsnr, IgniteAsyncCallback.class);
+ }
+
initLocalListener(locLsnr, ctx);
if (initFut == null) {
@@ -772,6 +880,17 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
* @return Cache entry event filter.
*/
protected CacheEntryEventFilter getEventFilter0() {
+ if (rmtFilterFactory != null) {
+ if (rmtFilterFromFactory == null) {
+ rmtFilterFromFactory = rmtFilterFactory.create();
+
+ if (types != 0)
+ rmtFilterFromFactory = new
JCacheQueryRemoteFilter(rmtFilterFromFactory, types);
+ }
+
+ return rmtFilterFromFactory;
+ }
+
return rmtFilter;
}
@@ -788,14 +907,17 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
* @return Cache entry event transformer.
*/
public IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>
getTransformer0() {
- return null;
+ if (rmtTrans == null && rmtTransFactory != null)
+ rmtTrans = rmtTransFactory.create();
+
+ return rmtTrans;
}
/**
* @return Local listener of transformed events.
*/
@Nullable public EventListener<?> localTransformedEventListener() {
- return null;
+ return locTransLsnr;
}
/**
@@ -1265,8 +1387,14 @@ public class CacheContinuousQueryHandler<K, V>
implements GridContinuousHandler
assert ctx != null;
assert ctx.config().isPeerClassLoadingEnabled();
- if (rmtFilter != null && !U.isGrid(rmtFilter.getClass()))
+ if (requiresDeployment(rmtFilter))
rmtFilterDep = new CacheContinuousQueryDeployableObject(rmtFilter,
ctx);
+
+ if (requiresDeployment(rmtFilterFactory))
+ rmtFilterFactoryDep = new
CacheContinuousQueryDeployableObject(rmtFilterFactory, ctx);
+
+ if (requiresDeployment(rmtTransFactory))
+ rmtTransFactoryDep = new
CacheContinuousQueryDeployableObject(rmtTransFactory, ctx);
}
/** {@inheritDoc} */
@@ -1278,6 +1406,12 @@ public class CacheContinuousQueryHandler<K, V>
implements GridContinuousHandler
if (rmtFilterDep != null)
rmtFilter = p2pUnmarshal(rmtFilterDep, nodeId, ctx);
+ if (rmtFilterFactoryDep != null)
+ rmtFilterFactory = p2pUnmarshal(rmtFilterFactoryDep, nodeId, ctx);
+
+ if (rmtTransFactoryDep != null)
+ rmtTransFactory = p2pUnmarshal(rmtTransFactoryDep, nodeId, ctx);
+
if (!p2pUnmarshalFut.isDone())
((GridFutureAdapter)p2pUnmarshalFut).onDone();
}
@@ -1286,7 +1420,9 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
* @return Whether the handler is marshalled for peer class loading.
*/
public boolean isMarshalled() {
- return rmtFilter == null || U.isGrid(rmtFilter.getClass()) ||
rmtFilterDep != null;
+ return (!requiresDeployment(rmtFilter) || rmtFilterDep != null)
+ && (!requiresDeployment(rmtFilterFactory) || rmtFilterFactoryDep
!= null)
+ && (!requiresDeployment(rmtTransFactory) || rmtTransFactoryDep !=
null);
}
/**
@@ -1418,14 +1554,7 @@ public class CacheContinuousQueryHandler<K, V>
implements GridContinuousHandler
U.writeString(out, cacheName);
out.writeObject(topic);
- boolean b = rmtFilterDep != null;
-
- out.writeBoolean(b);
-
- if (b)
- out.writeObject(rmtFilterDep);
- else
- out.writeObject(rmtFilter);
+ writeDeployable(out, rmtFilter, rmtFilterDep);
out.writeBoolean(internal);
out.writeBoolean(notifyExisting);
@@ -1434,6 +1563,12 @@ public class CacheContinuousQueryHandler<K, V>
implements GridContinuousHandler
out.writeBoolean(ignoreExpired);
out.writeInt(taskHash);
out.writeBoolean(keepBinary);
+
+ writeDeployable(out, rmtFilterFactory, rmtFilterFactoryDep);
+
+ out.writeByte(types);
+
+ writeDeployable(out, rmtTransFactory, rmtTransFactoryDep);
}
/** {@inheritDoc} */
@@ -1459,9 +1594,42 @@ public class CacheContinuousQueryHandler<K, V>
implements GridContinuousHandler
taskHash = in.readInt();
keepBinary = in.readBoolean();
+ b = in.readBoolean();
+
+ if (b) {
+ rmtFilterFactoryDep =
(CacheContinuousQueryDeployableObject)in.readObject();
+
+ if (p2pUnmarshalFut.isDone())
+ p2pUnmarshalFut = new GridFutureAdapter<>();
+ }
+ else
+ rmtFilterFactory = (Factory)in.readObject();
+
+ types = in.readByte();
+
+ b = in.readBoolean();
+
+ if (b) {
+ rmtTransFactoryDep =
(CacheContinuousQueryDeployableObject)in.readObject();
+
+ if (p2pUnmarshalFut.isDone())
+ p2pUnmarshalFut = new GridFutureAdapter<>();
+ }
+ else
+ rmtTransFactory = (Factory<? extends
IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>>)in.readObject();
+
cacheId = CU.cacheId(cacheName);
}
+ /** */
+ private static void writeDeployable(ObjectOutput out, Object obj,
CacheContinuousQueryDeployableObject dep) throws IOException {
+ boolean b = dep != null;
+
+ out.writeBoolean(b);
+
+ out.writeObject(b ? dep : obj);
+ }
+
/**
* @param ctx Kernal context.
* @return Cache context.
@@ -1639,4 +1807,9 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
Map<Integer, CacheContinuousQueryEventBuffer>
partitionContinuesQueryEntryBuffers() {
return Collections.unmodifiableMap(entryBufs);
}
+
+ /** */
+ private static boolean requiresDeployment(@Nullable Object obj) {
+ return obj != null && !U.isGrid(obj.getClass());
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
deleted file mode 100644
index 06ad511207d..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.UUID;
-import javax.cache.configuration.Factory;
-import javax.cache.event.CacheEntryEventFilter;
-import javax.cache.event.CacheEntryUpdatedListener;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridKernalContext;
-import
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
-import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Continuous query handler V2 version. Contains {@link Factory} for remote
listener.
- */
-public class CacheContinuousQueryHandlerV2<K, V> extends
CacheContinuousQueryHandler<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Remote filter factory. */
- Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
-
- /** Deployable object for filter factory. */
- private CacheContinuousQueryDeployableObject rmtFilterFactoryDep;
-
- /** Event types for JCache API. */
- private byte types;
-
- /** */
- protected transient CacheEntryEventFilter filter;
-
- /**
- * Required by {@link Externalizable}.
- */
- public CacheContinuousQueryHandlerV2() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param cacheName Cache name.
- * @param topic Topic for ordered messages.
- * @param locLsnr Local listener.
- * @param rmtFilterFactory Remote filter factory.
- * @param oldValRequired Old value required flag.
- * @param sync Synchronous flag.
- * @param ignoreExpired Ignore expired events flag.
- * @param types Event types.
- */
- public CacheContinuousQueryHandlerV2(
- String cacheName,
- Object topic,
- @Nullable CacheEntryUpdatedListener<K, V> locLsnr,
- @Nullable Factory<? extends CacheEntryEventFilter<K, V>>
rmtFilterFactory,
- boolean oldValRequired,
- boolean sync,
- boolean ignoreExpired,
- boolean ignoreClsNotFound,
- @Nullable Byte types) {
- super(cacheName,
- topic,
- locLsnr,
- null,
- oldValRequired,
- sync,
- ignoreExpired,
- ignoreClsNotFound);
- this.rmtFilterFactory = rmtFilterFactory;
-
- if (types != null) {
- assert types != 0;
-
- this.types = types;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected CacheEntryEventFilter getEventFilter0() {
- if (filter == null) {
- assert rmtFilterFactory != null;
-
- Factory<? extends CacheEntryEventFilter> factory =
rmtFilterFactory;
-
- filter = factory.create();
-
- if (types != 0)
- filter = new JCacheQueryRemoteFilter(filter, types);
- }
-
- return filter;
- }
-
- /** {@inheritDoc} */
- @Override public void p2pMarshal(GridKernalContext ctx) throws
IgniteCheckedException {
- super.p2pMarshal(ctx);
-
- if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass()))
- rmtFilterFactoryDep = new
CacheContinuousQueryDeployableObject(rmtFilterFactory, ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx)
throws IgniteCheckedException {
- if (rmtFilterFactoryDep != null)
- rmtFilterFactory = p2pUnmarshal(rmtFilterFactoryDep, nodeId, ctx);
-
- super.p2pUnmarshal(nodeId, ctx);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isMarshalled() {
- return super.isMarshalled() &&
- (rmtFilterFactory == null || U.isGrid(rmtFilterFactory.getClass())
|| rmtFilterFactoryDep != null);
- }
-
- /** {@inheritDoc} */
- @Override public GridContinuousHandler clone() {
- return super.clone();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CacheContinuousQueryHandlerV2.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- super.writeExternal(out);
-
- boolean b = rmtFilterFactoryDep != null;
-
- out.writeBoolean(b);
-
- if (b)
- out.writeObject(rmtFilterFactoryDep);
- else
- out.writeObject(rmtFilterFactory);
-
- out.writeByte(types);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
- super.readExternal(in);
-
- boolean b = in.readBoolean();
-
- if (b) {
- rmtFilterFactoryDep =
(CacheContinuousQueryDeployableObject)in.readObject();
-
- if (p2pUnmarshalFut.isDone())
- p2pUnmarshalFut = new GridFutureAdapter<>();
- }
- else
- rmtFilterFactory = (Factory)in.readObject();
-
- types = in.readByte();
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
deleted file mode 100644
index 451081313a1..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.UUID;
-import javax.cache.configuration.Factory;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryEventFilter;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
-import
org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteAsyncCallback;
-import org.apache.ignite.lang.IgniteClosure;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Continuous query handler V3 version.
- * Contains {@link Factory} for remote transformer and {@link EventListener}.
- *
- * @see ContinuousQueryWithTransformer
- */
-public class CacheContinuousQueryHandlerV3<K, V> extends
CacheContinuousQueryHandlerV2<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Remote transformer. */
- private Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ?
extends V>, ?>> rmtTransFactory;
-
- /** Deployable object for transformer. */
- private CacheContinuousQueryDeployableObject rmtTransFactoryDep;
-
- /** Remote transformer. */
- private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>,
?> rmtTrans;
-
- /** Local listener for transformed events. */
- private transient EventListener<?> locTransLsnr;
-
- /**
- * Empty constructor.
- */
- public CacheContinuousQueryHandlerV3() {
- super();
- }
-
- /**
- * @param cacheName Cache name.
- * @param topic Topic.
- * @param locTransLsnr Local listener of transformed events
- * @param rmtFilterFactory Remote filter factory.
- * @param rmtTransFactory Remote transformer factory.
- * @param oldValRequired OldValRequired flag.
- * @param sync Sync flag.
- * @param ignoreExpired IgnoreExpired flag.
- * @param ignoreClsNotFound IgnoreClassNotFoundException flag.
- */
- public CacheContinuousQueryHandlerV3(
- String cacheName,
- Object topic,
- EventListener<?> locTransLsnr,
- @Nullable Factory<? extends CacheEntryEventFilter<K, V>>
rmtFilterFactory,
- Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends
V>, ?>> rmtTransFactory,
- boolean oldValRequired,
- boolean sync,
- boolean ignoreExpired,
- boolean ignoreClsNotFound) {
- super(
- cacheName,
- topic,
- null,
- rmtFilterFactory,
- oldValRequired,
- sync,
- ignoreExpired,
- ignoreClsNotFound,
- null);
-
- assert rmtTransFactory != null;
-
- this.locTransLsnr = locTransLsnr;
- this.rmtTransFactory = rmtTransFactory;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteClosure<CacheEntryEvent<? extends K, ? extends V>,
?> getTransformer0() {
- if (rmtTrans == null && rmtTransFactory != null)
- rmtTrans = rmtTransFactory.create();
-
- return rmtTrans;
- }
-
- /** {@inheritDoc} */
- @Override public EventListener<?> localTransformedEventListener() {
- return locTransLsnr;
- }
-
- /** {@inheritDoc} */
- @Override protected CacheEntryEventFilter getEventFilter0() {
- if (rmtFilterFactory == null)
- return null;
-
- return super.getEventFilter0();
- }
-
- /** {@inheritDoc} */
- @Override public RegisterStatus register(UUID nodeId, UUID routineId,
- GridKernalContext ctx) throws IgniteCheckedException {
- if (locTransLsnr != null) {
- ctx.resource().injectGeneric(locTransLsnr);
-
- asyncCb = U.hasAnnotation(locTransLsnr, IgniteAsyncCallback.class);
- }
-
- return super.register(nodeId, routineId, ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void p2pMarshal(GridKernalContext ctx) throws
IgniteCheckedException {
- super.p2pMarshal(ctx);
-
- if (rmtTransFactory != null && !U.isGrid(rmtTransFactory.getClass()))
- rmtTransFactoryDep = new
CacheContinuousQueryDeployableObject(rmtTransFactory, ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx)
throws IgniteCheckedException {
- if (rmtTransFactoryDep != null)
- rmtTransFactory = p2pUnmarshal(rmtTransFactoryDep, nodeId, ctx);
-
- super.p2pUnmarshal(nodeId, ctx);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isMarshalled() {
- return super.isMarshalled() &&
- (rmtTransFactory == null || U.isGrid(rmtTransFactory.getClass())
|| rmtTransFactoryDep != null);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- super.writeExternal(out);
-
- boolean b = rmtTransFactoryDep != null;
-
- out.writeBoolean(b);
-
- if (b)
- out.writeObject(rmtTransFactoryDep);
- else
- out.writeObject(rmtTransFactory);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
- super.readExternal(in);
-
- boolean b = in.readBoolean();
-
- if (b) {
- rmtTransFactoryDep =
(CacheContinuousQueryDeployableObject)in.readObject();
-
- if (p2pUnmarshalFut.isDone())
- p2pUnmarshalFut = new GridFutureAdapter<>();
- }
- else
- rmtTransFactory = (Factory<? extends
IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>>)in.readObject();
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 922e4203af3..ed362129e52 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -529,7 +529,7 @@ public class CacheContinuousQueryManager<K, V> extends
GridCacheManagerAdapter<K
if (rmtTransFactory != null) {
clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
@Override public CacheContinuousQueryHandler apply() {
- return new CacheContinuousQueryHandlerV3(
+ return new CacheContinuousQueryHandler(
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(),
seq.getAndIncrement()),
locTransLsnr,
@@ -545,7 +545,7 @@ public class CacheContinuousQueryManager<K, V> extends
GridCacheManagerAdapter<K
else if (rmtFilterFactory != null) {
clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
@Override public CacheContinuousQueryHandler apply() {
- return new CacheContinuousQueryHandlerV2(
+ return new CacheContinuousQueryHandler(
cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(),
seq.getAndIncrement()),
locLsnr,
@@ -1109,7 +1109,7 @@ public class CacheContinuousQueryManager<K, V> extends
GridCacheManagerAdapter<K
Factory<CacheEntryEventFilter<K, V>> rmtFilterFactory
= cfg.getCacheEntryEventFilterFactory();
if (rmtFilterFactory != null)
- hnd = new CacheContinuousQueryHandlerV2(
+ hnd = new CacheContinuousQueryHandler(
cctx.name(),
TOPIC_CACHE.topic(topicPrefix,
cctx.localNodeId(), seq.getAndIncrement()),
locLsnr,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1786bdf4003..7509a88fa17 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -171,8 +171,8 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
/** */
private ContinuousRoutinesInfo routinesInfo;
- /** */
- private int discoProtoVer;
+ /** Whether Discovery SPI uses immutable custom messages. */
+ private boolean immutableDiscoCustomMsg;
/**
* @param ctx Kernal context.
@@ -188,9 +188,9 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
new ReadOnlyCollectionView2X<>(rmtInfos.entrySet(),
locInfos.entrySet()),
e -> new ContinuousQueryView(e.getKey(), e.getValue()));
- discoProtoVer = ctx.discovery().mutableCustomMessages() ? 1 : 2;
+ immutableDiscoCustomMsg = !ctx.discovery().mutableCustomMessages();
- if (discoProtoVer == 2)
+ if (immutableDiscoCustomMsg)
routinesInfo = new ContinuousRoutinesInfo();
retryDelay = ctx.config().getNetworkSendRetryDelay();
@@ -211,12 +211,12 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
@Override public void onCustomEvent(AffinityTopologyVersion
topVer,
ClusterNode snd,
StartRoutineDiscoveryMessage msg) {
- assert discoProtoVer == 1 : discoProtoVer;
+ assert !immutableDiscoCustomMsg;
if (ctx.isStopping())
return;
- processStartRequest(snd, msg);
+ processStartRequestMutable(snd, msg);
}
});
@@ -225,12 +225,12 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
@Override public void onCustomEvent(AffinityTopologyVersion
topVer,
ClusterNode snd,
StartRoutineDiscoveryMessageV2 msg) {
- assert discoProtoVer == 2 : discoProtoVer;
+ assert immutableDiscoCustomMsg;
if (ctx.isStopping())
return;
- processStartRequestV2(topVer, snd, msg);
+ processStartRequestImmutable(topVer, snd, msg);
}
});
@@ -251,7 +251,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
@Override public void onCustomEvent(AffinityTopologyVersion
topVer,
ClusterNode snd,
StopRoutineDiscoveryMessage msg) {
- if (discoProtoVer == 2)
+ if (immutableDiscoCustomMsg)
routinesInfo.removeRoutine(msg.routineId);
if (ctx.isStopping())
@@ -408,7 +408,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- if (discoProtoVer == 2) {
+ if (immutableDiscoCustomMsg) {
routinesInfo.collectJoiningNodeData(dataBag);
return;
@@ -422,7 +422,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
- if (discoProtoVer == 2) {
+ if (immutableDiscoCustomMsg) {
routinesInfo.collectGridNodeData(dataBag);
return;
@@ -519,7 +519,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
']');
}
- if (discoProtoVer == 2) {
+ if (immutableDiscoCustomMsg) {
if (data.hasJoiningNodeData()) {
ContinuousRoutinesJoiningNodeDiscoveryData nodeData =
(ContinuousRoutinesJoiningNodeDiscoveryData)
data.joiningNodeData();
@@ -527,19 +527,19 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
for (ContinuousRoutineInfo routineInfo :
nodeData.startedRoutines) {
routinesInfo.addRoutineInfo(routineInfo);
- onDiscoveryDataReceivedV2(routineInfo);
+ onDiscoveryDataReceivedImmutable(routineInfo);
}
}
}
else {
if (data.hasJoiningNodeData())
-
onDiscoveryDataReceivedV1((DiscoveryData)data.joiningNodeData());
+
onDiscoveryDataReceivedMutable((DiscoveryData)data.joiningNodeData());
}
}
/** {@inheritDoc} */
@Override public void onGridDataReceived(GridDiscoveryData data) {
- if (discoProtoVer == 2) {
+ if (immutableDiscoCustomMsg) {
if (data.commonData() != null) {
ContinuousRoutinesCommonDiscoveryData commonData =
(ContinuousRoutinesCommonDiscoveryData)data.commonData();
@@ -550,7 +550,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
routinesInfo.addRoutineInfo(routineInfo);
- onDiscoveryDataReceivedV2(routineInfo);
+ onDiscoveryDataReceivedImmutable(routineInfo);
}
}
}
@@ -559,18 +559,18 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
if (nodeSpecData != null) {
for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet())
- onDiscoveryDataReceivedV1((DiscoveryData)e.getValue());
+
onDiscoveryDataReceivedMutable((DiscoveryData)e.getValue());
}
}
}
/**
* Processes data received in a discovery message.
- * Used with protocol version 1.
+ * Used when Discovery SPI supports mutable custom messages.
*
* @param data received discovery data.
*/
- private void onDiscoveryDataReceivedV1(DiscoveryData data) {
+ private void onDiscoveryDataReceivedMutable(DiscoveryData data) {
if (data != null) {
for (DiscoveryDataItem item : data.items) {
if (!locInfos.containsKey(item.routineId)) {
@@ -610,11 +610,11 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
/**
* Processes data received in a discovery message.
- * Used with protocol version 2.
+ * Used when Discovery SPI doesn't support mutable custom messages.
*
* @param routineInfo Routine info.
*/
- private void onDiscoveryDataReceivedV2(ContinuousRoutineInfo routineInfo) {
+ private void onDiscoveryDataReceivedImmutable(ContinuousRoutineInfo
routineInfo) {
IgnitePredicate<ClusterNode> nodeFilter;
try {
@@ -792,7 +792,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
LocalRoutineInfo routineInfo = new LocalRoutineInfo(ctx.localNodeId(),
prjPred, hnd, 1, 0, true);
- if (discoProtoVer == 2) {
+ if (immutableDiscoCustomMsg) {
routinesInfo.addRoutineInfo(createRoutineInfo(
ctx.localNodeId(),
routineId,
@@ -991,7 +991,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
reqData.prepareMarshal(ctx);
- if (discoProtoVer == 1) {
+ if (!immutableDiscoCustomMsg) {
StartRoutineDiscoveryMessage msg = new
StartRoutineDiscoveryMessage(
routineId,
reqData);
@@ -1001,13 +1001,8 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
return msg;
}
- else {
- assert discoProtoVer == 2 : discoProtoVer;
-
- return new StartRoutineDiscoveryMessageV2(
- routineId,
- reqData);
- }
+ else
+ return new StartRoutineDiscoveryMessageV2(routineId, reqData);
}
/**
@@ -1077,7 +1072,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
unregisterHandler(routineId, routine.hnd, true);
}
- if (!stop && discoProtoVer == 2)
+ if (!stop && immutableDiscoCustomMsg)
stop = routinesInfo.routineExists(routineId);
// Finish if routine is not found (wrong ID is provided).
@@ -1265,7 +1260,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
clientInfos.clear();
- if (discoProtoVer == 2)
+ if (immutableDiscoCustomMsg)
routinesInfo.onClientDisconnected(locInfos.keySet());
if (log.isDebugEnabled()) {
@@ -1352,7 +1347,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
* @param node Sender.
* @param req Start request.
*/
- private void processStartRequest(ClusterNode node,
StartRoutineDiscoveryMessage req) {
+ private void processStartRequestMutable(ClusterNode node,
StartRoutineDiscoveryMessage req) {
if (node.id().equals(ctx.localNodeId()))
return;
@@ -1471,7 +1466,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
* @param snd Sender.
* @param msg Start request.
*/
- private void processStartRequestV2(final AffinityTopologyVersion topVer,
+ private void processStartRequestImmutable(final AffinityTopologyVersion
topVer,
final ClusterNode snd,
final StartRoutineDiscoveryMessageV2 msg) {
StartRequestData reqData = msg.startRequestData();
@@ -1913,7 +1908,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
- if (discoProtoVer == 2) {
+ if (immutableDiscoCustomMsg) {
routinesInfo.onNodeFail(nodeId);
for (StartFuture fut : startFuts.values())
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 370d18525b1..954755b375a 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1387,8 +1387,6 @@
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQuer
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$1
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler$ContinuousQueryAsyncClosure$1
-org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV2
-org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV3
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$1
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$2
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager$3