IGNITE-2700: Closures are now written using binary marshaller. This closes #518.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/215e8a1e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/215e8a1e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/215e8a1e Branch: refs/heads/ignite-2791 Commit: 215e8a1e3143bf22d792fdefbd4e6a65b372ae24 Parents: 8cf71d4 Author: Ilya Lantukh <ilant...@gridgain.com> Authored: Thu Mar 10 15:37:14 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Thu Mar 10 15:37:14 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/binary/BinaryContext.java | 29 +- .../internal/binary/BinaryFieldAccessor.java | 1 + .../closure/GridClosureProcessor.java | 375 ++++++++++++++++- .../resources/META-INF/classnames.properties | 6 + .../ignite/internal/GridAffinitySelfTest.java | 4 +- ...omputationBinarylizableClosuresSelfTest.java | 413 +++++++++++++++++++ .../binary/BinaryMarshallerSelfTest.java | 50 +++ ...IgniteBinaryObjectsComputeGridTestSuite.java | 7 +- 8 files changed, 858 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java index 4df9ba2..b9b633f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java @@ -61,6 +61,7 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.cache.binary.BinaryMetadataKey; +import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.datastructures.CollocatedQueueItemKey; import org.apache.ignite.internal.processors.datastructures.CollocatedSetItemKey; import org.apache.ignite.internal.util.IgniteUtils; @@ -89,6 +90,23 @@ public class BinaryContext { static final BinaryInternalMapper SIMPLE_NAME_LOWER_CASE_MAPPER = new BinaryInternalMapper(new BinaryBasicNameMapper(true), new BinaryBasicIdMapper(true), false); + /** Set of system classes that should be marshalled with BinaryMarshaller. */ + private static final Set<String> BINARYLIZABLE_SYS_CLSS; + + /** Binarylizable system classes set initialization. */ + static { + Set<String> sysClss = new HashSet<>(); + + sysClss.add(GridClosureProcessor.C1V2.class.getName()); + sysClss.add(GridClosureProcessor.C1MLAV2.class.getName()); + sysClss.add(GridClosureProcessor.C2V2.class.getName()); + sysClss.add(GridClosureProcessor.C2MLAV2.class.getName()); + sysClss.add(GridClosureProcessor.C4V2.class.getName()); + sysClss.add(GridClosureProcessor.C4MLAV2.class.getName()); + + BINARYLIZABLE_SYS_CLSS = Collections.unmodifiableSet(sysClss); + } + /** */ private final ConcurrentMap<Class<?>, BinaryClassDescriptor> descByCls = new ConcurrentHashMap8<>(); @@ -255,7 +273,7 @@ public class BinaryContext { /** * @return Ignite configuration. */ - public IgniteConfiguration configuration(){ + public IgniteConfiguration configuration() { return igniteCfg; } @@ -587,6 +605,11 @@ public class BinaryContext { String clsName = cls.getName(); if (marshCtx.isSystemType(clsName)) { + BinarySerializer serializer = null; + + if (BINARYLIZABLE_SYS_CLSS.contains(clsName)) + serializer = new BinaryReflectiveSerializer(); + desc = new BinaryClassDescriptor(this, cls, false, @@ -594,7 +617,7 @@ public class BinaryContext { clsName, null, SIMPLE_NAME_LOWER_CASE_MAPPER, - null, + serializer, false, true /* registered */ ); @@ -775,7 +798,7 @@ public class BinaryContext { if (prevMap != null && !mapper.equals(prevMap)) throw new IgniteException("Different mappers [clsName=" + clsName + ", newMapper=" + mapper - + ", prevMap=" + prevMap + "]"); + + ", prevMap=" + prevMap + "]"); prevMap = typeId2Mapper.putIfAbsent(mapper.typeId(clsName), mapper); http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java index 8c8bf27..af33b63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java @@ -619,6 +619,7 @@ public abstract class BinaryFieldAccessor { case BINARY: case OBJECT: + case PROXY: writer.writeObjectField(val); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 043f754..c6883dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -31,6 +31,12 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.binary.BinaryRawWriter; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; +import org.apache.ignite.binary.Binarylizable; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobMasterLeaveAware; @@ -61,6 +67,7 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.util.worker.GridWorkerFuture; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.resources.LoadBalancerResource; @@ -76,6 +83,9 @@ import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKe * */ public class GridClosureProcessor extends GridProcessorAdapter { + /** Ignite version in which binarylizable versions of closures were introduced. */ + public static final IgniteProductVersion BINARYLIZABLE_CLOSURES_SINCE = IgniteProductVersion.fromString("1.6.0"); + /** */ private final Executor sysPool; @@ -254,7 +264,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { case BROADCAST: { for (ClusterNode n : nodes) for (Runnable r : jobs) - mapper.map(job(r), n); + mapper.map(downgradeJobIfNeeded(job(r), n), n); break; } @@ -263,7 +273,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { for (Runnable r : jobs) { ComputeJob job = job(r); - mapper.map(job, lb.getBalancedNode(job, null)); + ClusterNode n = lb.getBalancedNode(job, null); + + mapper.map(downgradeJobIfNeeded(job, n), n); } break; @@ -306,7 +318,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { case BROADCAST: { for (ClusterNode n : nodes) for (Callable<R> c : jobs) - mapper.map(job(c), n); + mapper.map(downgradeJobIfNeeded(job(c), n), n); break; } @@ -315,7 +327,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { for (Callable<R> c : jobs) { ComputeJob job = job(c); - mapper.map(job, lb.getBalancedNode(job, null)); + ClusterNode n = lb.getBalancedNode(job, null); + + mapper.map(downgradeJobIfNeeded(job, n), n); } break; @@ -1025,7 +1039,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { private static <T, R> ComputeJob job(final IgniteClosure<T, R> job, @Nullable final T arg) { A.notNull(job, "job"); - return job instanceof ComputeJobMasterLeaveAware ? new C1MLA<>(job, arg) : new C1<>(job, arg); + return job instanceof ComputeJobMasterLeaveAware ? new C1MLAV2<>(job, arg) : new C1V2<>(job, arg); } /** @@ -1037,7 +1051,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { private static <R> ComputeJob job(final Callable<R> c) { A.notNull(c, "job"); - return c instanceof ComputeJobMasterLeaveAware ? new C2MLA<>(c) : new C2<>(c); + return c instanceof ComputeJobMasterLeaveAware ? new C2MLAV2<>(c) : new C2V2<>(c); } /** @@ -1049,7 +1063,46 @@ public class GridClosureProcessor extends GridProcessorAdapter { private static ComputeJob job(final Runnable r) { A.notNull(r, "job"); - return r instanceof ComputeJobMasterLeaveAware ? new C4MLA(r) : new C4(r); + return r instanceof ComputeJobMasterLeaveAware ? new C4MLAV2(r) : new C4V2(r); + } + + /** + * Downgrades provided job to older version if target does not support it. + * + * @param job Job. + * @param node Node. + * @return Provided or downgraded job. + */ + private static ComputeJob downgradeJobIfNeeded(ComputeJob job, ClusterNode node) { + A.notNull(job, "job"); + + assert node != null; + + IgniteProductVersion nodeVer = node.version(); + + if (nodeVer.compareTo(BINARYLIZABLE_CLOSURES_SINCE) >= 0) + return job; + + if (job instanceof C1V2) { + if (job instanceof C1MLAV2) + return new C1MLA<>(((C1MLAV2)job).job, ((C1MLAV2)job).arg); + else + return new C1<>(((C1V2)job).job, ((C1V2)job).arg); + } + else if (job instanceof C2V2) { + if (job instanceof C2MLAV2) + return new C2MLA<>(((C2MLAV2)job).c); + else + return new C2<>(((C2V2)job).c); + } + else if (job instanceof C4V2) { + if (job instanceof C4MLAV2) + return new C4MLA(((C4MLAV2)job).r); + else + return new C4(((C4V2)job).r); + } + + return job; } /** @@ -1294,9 +1347,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) { - ComputeJob job = job(this.job); - - return Collections.singletonMap(job, node); + return Collections.singletonMap(downgradeJobIfNeeded(job(this.job), node), node); } /** {@inheritDoc} */ @@ -1348,9 +1399,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) { - ComputeJob job = job(this.job); - - return Collections.singletonMap(job, node); + return Collections.singletonMap(downgradeJobIfNeeded(job(this.job), node), node); } /** {@inheritDoc} */ @@ -1488,7 +1537,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) { ComputeJob job = job(this.job, this.arg); - return Collections.singletonMap(job, lb.getBalancedNode(job, null)); + ClusterNode node = lb.getBalancedNode(job, null); + + return Collections.singletonMap(downgradeJobIfNeeded(job, node), node); } /** {@inheritDoc} */ @@ -1537,7 +1588,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { for (T jobArg : args) { ComputeJob job = job(this.job, jobArg); - mapper.map(job, lb.getBalancedNode(job, null)); + ClusterNode node = lb.getBalancedNode(job, null); + + mapper.map(downgradeJobIfNeeded(job, node), node); } return mapper.map(); @@ -1593,7 +1646,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { for (T jobArg : args) { ComputeJob job = job(this.job, jobArg); - mapper.map(job, lb.getBalancedNode(job, null)); + ClusterNode node = lb.getBalancedNode(job, null); + + mapper.map(downgradeJobIfNeeded(job, node), node); } return mapper.map(); @@ -1607,7 +1662,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { ComputeJobResultPolicy resPlc = super.result(res, rcvd); - if (res.getException() == null && resPlc != FAILOVER && !rdc.collect((R1) res.getData())) + if (res.getException() == null && resPlc != FAILOVER && !rdc.collect((R1)res.getData())) resPlc = REDUCE; // If reducer returned false - reduce right away. return resPlc; @@ -1647,7 +1702,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { JobMapper mapper = new JobMapper(subgrid.size()); for (ClusterNode n : subgrid) - mapper.map(job(job, arg), n); + mapper.map(downgradeJobIfNeeded(job(job, arg), n), n); return mapper.map(); } @@ -1680,7 +1735,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * */ - public C1(){ + public C1() { // No-op. } @@ -1729,6 +1784,72 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * */ + public static class C1V2<T, R> implements ComputeJob, Binarylizable, GridNoImplicitInjection, + GridInternalWrapper<IgniteClosure> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + protected IgniteClosure<T, R> job; + + /** */ + @GridToStringInclude + protected T arg; + + /** + * + */ + public C1V2() { + // No-op. + } + + /** + * @param job Job. + * @param arg Argument. + */ + C1V2(IgniteClosure<T, R> job, T arg) { + this.job = job; + this.arg = arg; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + return job.apply(arg); + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + BinaryRawWriter rawWriter = writer.rawWriter(); + + rawWriter.writeObject(job); + rawWriter.writeObject(arg); + } + + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + BinaryRawReader rawReader = reader.rawReader(); + + job = rawReader.readObject(); + arg = rawReader.readObject(); + } + + /** {@inheritDoc} */ + @Override public IgniteClosure userObject() { + return job; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(C1V2.class, this); + } + } + + /** + * + */ private static class C1MLA<T, R> extends C1<T, R> implements ComputeJobMasterLeaveAware { /** */ private static final long serialVersionUID = 0L; @@ -1762,6 +1883,39 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * */ + public static class C1MLAV2<T, R> extends C1V2<T, R> implements ComputeJobMasterLeaveAware { + /** */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public C1MLAV2() { + // No-op. + } + + /** + * @param job Job. + * @param arg Argument. + */ + private C1MLAV2(IgniteClosure<T, R> job, T arg) { + super(job, arg); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) { + ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(C1MLAV2.class, this, super.toString()); + } + } + + /** + * + */ private static class C2<R> implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Callable> { /** */ private static final long serialVersionUID = 0L; @@ -1772,7 +1926,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * */ - public C2(){ + public C2() { // No-op. } @@ -1822,7 +1976,66 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * */ - private static class C2MLA<R> extends C2<R> implements ComputeJobMasterLeaveAware{ + public static class C2V2<R> implements ComputeJob, Binarylizable, GridNoImplicitInjection, + GridInternalWrapper<Callable> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + protected Callable<R> c; + + /** + * + */ + public C2V2() { + // No-op. + } + + /** + * @param c Callable. + */ + private C2V2(Callable<R> c) { + this.c = c; + } + + /** {@inheritDoc} */ + @Override public Object execute() { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + writer.rawWriter().writeObject(c); + } + + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + c = reader.rawReader().readObject(); + } + + /** {@inheritDoc} */ + @Override public Callable userObject() { + return c; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(C2V2.class, this); + } + } + + /** + * + */ + private static class C2MLA<R> extends C2<R> implements ComputeJobMasterLeaveAware { /** */ private static final long serialVersionUID = 0L; @@ -1852,6 +2065,38 @@ public class GridClosureProcessor extends GridProcessorAdapter { } /** + * + */ + public static class C2MLAV2<R> extends C2V2<R> implements ComputeJobMasterLeaveAware { + /** */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public C2MLAV2() { + // No-op. + } + + /** + * @param c Callable. + */ + private C2MLAV2(Callable<R> c) { + super(c); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) { + ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(C2MLAV2.class, this, super.toString()); + } + } + + /** */ private static class C4 implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Runnable> { /** */ @@ -1863,7 +2108,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** * */ - public C4(){ + public C4() { // No-op. } @@ -1908,6 +2153,60 @@ public class GridClosureProcessor extends GridProcessorAdapter { } /** + */ + public static class C4V2 implements ComputeJob, Binarylizable, GridNoImplicitInjection, GridInternalWrapper<Runnable> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + protected Runnable r; + + /** + * + */ + public C4V2() { + // No-op. + } + + /** + * @param r Runnable. + */ + private C4V2(Runnable r) { + this.r = r; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + r.run(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + writer.rawWriter().writeObject(r); + } + + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + r = reader.rawReader().readObject(); + } + + /** {@inheritDoc} */ + @Override public Runnable userObject() { + return r; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(C4V2.class, this); + } + } + + /** * */ private static class C4MLA extends C4 implements ComputeJobMasterLeaveAware { @@ -1938,4 +2237,36 @@ public class GridClosureProcessor extends GridProcessorAdapter { return S.toString(C4MLA.class, this, super.toString()); } } + + /** + * + */ + public static class C4MLAV2 extends C4V2 implements ComputeJobMasterLeaveAware { + /** */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public C4MLAV2() { + // No-op. + } + + /** + * @param r Runnable. + */ + private C4MLAV2(Runnable r) { + super(r); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) { + ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(C4MLAV2.class, this, super.toString()); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 8c3ad88..9728d9c 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -900,11 +900,17 @@ org.apache.ignite.internal.processors.clock.GridClockDeltaSnapshotMessage org.apache.ignite.internal.processors.clock.GridClockDeltaVersion org.apache.ignite.internal.processors.closure.GridClosurePolicy org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1 +org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1V2 org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLA +org.apache.ignite.internal.processors.closure.GridClosureProcessor$C1MLAV2 org.apache.ignite.internal.processors.closure.GridClosureProcessor$C2 +org.apache.ignite.internal.processors.closure.GridClosureProcessor$C2V2 org.apache.ignite.internal.processors.closure.GridClosureProcessor$C2MLA +org.apache.ignite.internal.processors.closure.GridClosureProcessor$C2MLAV2 org.apache.ignite.internal.processors.closure.GridClosureProcessor$C4 +org.apache.ignite.internal.processors.closure.GridClosureProcessor$C4V2 org.apache.ignite.internal.processors.closure.GridClosureProcessor$C4MLA +org.apache.ignite.internal.processors.closure.GridClosureProcessor$C4MLAV2 org.apache.ignite.internal.processors.closure.GridClosureProcessor$T1 org.apache.ignite.internal.processors.closure.GridClosureProcessor$T10 org.apache.ignite.internal.processors.closure.GridClosureProcessor$T11 http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java index a75023f..9e438e9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinitySelfTest.java @@ -84,13 +84,15 @@ public class GridAffinitySelfTest extends GridCommonAbstractTest { /** * @throws IgniteCheckedException If failed. */ - public void testAffinity() throws IgniteCheckedException { + public void testAffinity() throws Exception { Ignite g1 = grid(1); Ignite g2 = grid(2); assert caches(g1).size() == 0; assert F.first(caches(g2)).getCacheMode() == PARTITIONED; + awaitPartitionMapExchange(); + Map<ClusterNode, Collection<String>> map = g1.<String>affinity(null).mapKeysToNodes(F.asList("1")); assertNotNull(map); http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/test/java/org/apache/ignite/internal/GridComputationBinarylizableClosuresSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridComputationBinarylizableClosuresSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridComputationBinarylizableClosuresSelfTest.java new file mode 100644 index 0000000..96f0277 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridComputationBinarylizableClosuresSelfTest.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; +import org.apache.ignite.binary.Binarylizable; +import org.apache.ignite.compute.ComputeJobMasterLeaveAware; +import org.apache.ignite.compute.ComputeTaskSession; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test ensuring that correct closures are serialized. + */ +public class GridComputationBinarylizableClosuresSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new BinaryMarshaller()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + TestBinarylizableClosure.writeCalled.set(false); + TestBinarylizableClosure.readCalled.set(false); + TestBinarylizableClosure.executed.set(false); + + TestBinarylizableMasterLeaveAwareClosure.writeCalled.set(false); + TestBinarylizableMasterLeaveAwareClosure.readCalled.set(false); + + TestBinarylizableCallable.writeCalled.set(false); + TestBinarylizableCallable.readCalled.set(false); + TestBinarylizableCallable.executed.set(false); + + TestBinarylizableMasterLeaveAwareCallable.writeCalled.set(false); + TestBinarylizableMasterLeaveAwareCallable.readCalled.set(false); + + TestBinarylizableRunnable.writeCalled.set(false); + TestBinarylizableRunnable.readCalled.set(false); + TestBinarylizableRunnable.executed.set(false); + + TestBinarylizableMasterLeaveAwareRunnable.writeCalled.set(false); + TestBinarylizableMasterLeaveAwareRunnable.readCalled.set(false); + + TestBinarylizableObject.writeCalled.set(false); + TestBinarylizableObject.readCalled.set(false); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Test that Binarylizable IgniteClosure is serialized using BinaryMarshaller. + * + * @throws Exception If failed. + */ + public void testJob() throws Exception { + Ignite ignite = startGrid(1); + startGrid(2); + + final TestBinarylizableClosure closure = new TestBinarylizableClosure(); + + ignite.compute(ignite.cluster().forRemotes()).apply(closure, new TestBinarylizableObject()); + + assert TestBinarylizableClosure.executed.get(); + assert TestBinarylizableClosure.writeCalled.get(); + assert TestBinarylizableClosure.readCalled.get(); + + assert TestBinarylizableObject.writeCalled.get(); + assert TestBinarylizableObject.readCalled.get(); + } + + /** + * Test that Binarylizable IgniteClosure with ComputeJobMasterLeaveAware interface is serialized + * using BinaryMarshaller. + * + * @throws Exception If failed. + */ + public void testMasterLeaveAwareJob() throws Exception { + Ignite ignite = startGrid(1); + startGrid(2); + + final TestBinarylizableMasterLeaveAwareClosure job = new TestBinarylizableMasterLeaveAwareClosure(); + + ignite.compute(ignite.cluster().forRemotes()).apply(job, new TestBinarylizableObject()); + + assert TestBinarylizableClosure.executed.get(); + assert TestBinarylizableClosure.writeCalled.get(); + assert TestBinarylizableClosure.readCalled.get(); + + assert TestBinarylizableMasterLeaveAwareClosure.writeCalled.get(); + assert TestBinarylizableMasterLeaveAwareClosure.readCalled.get(); + + assert TestBinarylizableObject.writeCalled.get(); + assert TestBinarylizableObject.readCalled.get(); + } + + /** + * Test that Binarylizable IgniteCallable is serialized using BinaryMarshaller. + * + * @throws Exception If failed. + */ + public void testCallable() throws Exception { + Ignite ignite = startGrid(1); + startGrid(2); + + final TestBinarylizableCallable callable = new TestBinarylizableCallable(); + + ignite.compute(ignite.cluster().forRemotes()).call(callable); + + assert TestBinarylizableCallable.executed.get(); + assert TestBinarylizableCallable.writeCalled.get(); + assert TestBinarylizableCallable.readCalled.get(); + } + + /** + * Test that Binarylizable IgniteCallable with ComputeJobMasterLeaveAware interface is serialized + * using BinaryMarshaller. + * + * @throws Exception If failed. + */ + public void testMasterLeaveAwareCallable() throws Exception { + Ignite ignite = startGrid(1); + startGrid(2); + + final TestBinarylizableMasterLeaveAwareCallable callable = new TestBinarylizableMasterLeaveAwareCallable(); + + ignite.compute(ignite.cluster().forRemotes()).call(callable); + + assert TestBinarylizableCallable.executed.get(); + assert TestBinarylizableCallable.writeCalled.get(); + assert TestBinarylizableCallable.readCalled.get(); + + assert TestBinarylizableMasterLeaveAwareCallable.writeCalled.get(); + assert TestBinarylizableMasterLeaveAwareCallable.readCalled.get(); + } + + /** + * Test that Binarylizable IgniteRunnable is serialized using BinaryMarshaller. + * + * @throws Exception If failed. + */ + public void testRunnable() throws Exception { + Ignite ignite = startGrid(1); + startGrid(2); + + final TestBinarylizableRunnable runnable = new TestBinarylizableRunnable(); + + ignite.compute(ignite.cluster().forRemotes()).run(runnable); + + assert TestBinarylizableRunnable.executed.get(); + assert TestBinarylizableRunnable.writeCalled.get(); + assert TestBinarylizableRunnable.readCalled.get(); + } + + /** + * Test that Binarylizable IgniteRunnable with ComputeJobMasterLeaveAware interface is serialized + * using BinaryMarshaller. + * + * @throws Exception If failed. + */ + public void testMasterLeaveAwareRunnable() throws Exception { + Ignite ignite = startGrid(1); + startGrid(2); + + final TestBinarylizableMasterLeaveAwareRunnable runnable = new TestBinarylizableMasterLeaveAwareRunnable(); + + ignite.compute(ignite.cluster().forRemotes()).run(runnable); + + assert TestBinarylizableRunnable.executed.get(); + assert TestBinarylizableRunnable.writeCalled.get(); + assert TestBinarylizableRunnable.readCalled.get(); + + assert TestBinarylizableMasterLeaveAwareRunnable.writeCalled.get(); + assert TestBinarylizableMasterLeaveAwareRunnable.readCalled.get(); + } + + /** + * Test Binarylizable IgniteClosure. + */ + private static class TestBinarylizableClosure implements IgniteClosure, Binarylizable { + + /** Tracks {@link TestBinarylizableClosure::writeBinary(BinaryWriter writer)} calls. */ + private static AtomicBoolean writeCalled = new AtomicBoolean(); + + /** Tracks {@link TestBinarylizableClosure::readBinary(BinaryReader reader)} calls. */ + private static AtomicBoolean readCalled = new AtomicBoolean(); + + /** Tracks {@link TestBinarylizableClosure::apply(Object o)} calls. */ + private static AtomicBoolean executed = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override public Object apply(Object o) { + executed.set(true); + return null; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + writeCalled.set(true); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + readCalled.set(true); + } + } + + /** + * Test Binarylizable IgniteClosure with ComputeJobMasterLeaveAware interface. + */ + private static class TestBinarylizableMasterLeaveAwareClosure extends TestBinarylizableClosure + implements ComputeJobMasterLeaveAware { + + /** Tracks {@link TestBinarylizableMasterLeaveAwareClosure::writeBinary(BinaryWriter writer)} calls. */ + private static AtomicBoolean writeCalled = new AtomicBoolean(); + + /** Tracks {@link TestBinarylizableMasterLeaveAwareClosure::readBinary(BinaryReader reader)} calls. */ + private static AtomicBoolean readCalled = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + super.writeBinary(writer); + writeCalled.set(true); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + super.readBinary(reader); + readCalled.set(true); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException { + } + } + + /** + * Test Binarylizable object. + */ + private static class TestBinarylizableObject implements Binarylizable { + + /** Tracks {@link TestBinarylizableObject::writeBinary(BinaryWriter writer)} calls. */ + private static AtomicBoolean writeCalled = new AtomicBoolean(); + + /** Tracks {@link TestBinarylizableObject::readBinary(BinaryReader reader)} calls. */ + private static AtomicBoolean readCalled = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + writeCalled.set(true); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + readCalled.set(true); + } + } + + /** + * Test Binarylizable Callable. + */ + private static class TestBinarylizableCallable implements IgniteCallable, Binarylizable { + + /** Tracks {@link TestBinarylizableCallable::writeBinary(BinaryWriter writer)} calls. */ + private static AtomicBoolean writeCalled = new AtomicBoolean(); + + /** Tracks {@link TestBinarylizableCallable::readBinary(BinaryReader reader)} calls. */ + private static AtomicBoolean readCalled = new AtomicBoolean(); + + /** Tracks {@link TestBinarylizableCallable::call()} calls. */ + private static AtomicBoolean executed = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + executed.set(true); + return null; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + writeCalled.set(true); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + readCalled.set(true); + } + } + + /** + * Test Binarylizable Callable with ComputeJobMasterLeaveAware interface. + */ + private static class TestBinarylizableMasterLeaveAwareCallable extends TestBinarylizableCallable + implements ComputeJobMasterLeaveAware { + + /** Tracks {@link TestBinarylizableMasterLeaveAwareCallable::writeBinary(BinaryWriter writer)} calls. */ + private static AtomicBoolean writeCalled = new AtomicBoolean(); + + /** Tracks {@link TestBinarylizableMasterLeaveAwareCallable::readBinary(BinaryReader reader)} calls. */ + private static AtomicBoolean readCalled = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + super.writeBinary(writer); + writeCalled.set(true); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + super.readBinary(reader); + readCalled.set(true); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException { + } + } + + /** + * Test Binarylizable Runnable. + */ + private static class TestBinarylizableRunnable implements IgniteRunnable, Binarylizable { + + /** Tracks {@link TestBinarylizableRunnable::writeBinary(BinaryWriter writer)} calls. */ + private static AtomicBoolean writeCalled = new AtomicBoolean(); + + /** Tracks {@link TestBinarylizableRunnable::readBinary(BinaryReader reader)} calls. */ + private static AtomicBoolean readCalled = new AtomicBoolean(); + + /** Tracks {@link TestBinarylizableRunnable::run()} calls. */ + private static AtomicBoolean executed = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override public void run() { + executed.set(true); + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + writeCalled.set(true); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + readCalled.set(true); + } + } + + /** + * Test Binarylizable Runnable with ComputeJobMasterLeaveAware interface. + */ + private static class TestBinarylizableMasterLeaveAwareRunnable extends TestBinarylizableRunnable + implements ComputeJobMasterLeaveAware { + + /** Tracks {@link TestBinarylizableMasterLeaveAwareRunnable::writeBinary(BinaryWriter writer)} calls. */ + private static AtomicBoolean writeCalled = new AtomicBoolean(); + + /** Tracks {@link TestBinarylizableMasterLeaveAwareRunnable::readBinary(BinaryReader reader)} calls. */ + private static AtomicBoolean readCalled = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + super.writeBinary(writer); + writeCalled.set(true); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + super.readBinary(reader); + readCalled.set(true); + } + + /** {@inheritDoc} */ + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException { + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java index 37b908a..eefe66c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java @@ -2690,6 +2690,35 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest { } /** + * Test object with {@link Proxy} field. + * + * @throws Exception If fails. + */ + public void testObjectContainingProxy() throws Exception { + BinaryMarshaller marsh = binaryMarshaller(); + + SomeItf inItf = (SomeItf)Proxy.newProxyInstance( + BinaryMarshallerSelfTest.class.getClassLoader(), new Class[] {SomeItf.class}, + new InvocationHandler() { + private NonSerializable obj = new NonSerializable(null); + + @Override public Object invoke(Object proxy, Method mtd, Object[] args) throws Throwable { + if ("hashCode".equals(mtd.getName())) + return obj.hashCode(); + + obj.checkAfterUnmarshalled(); + + return 17; + } + } + ); + + SomeItf outItf = marsh.unmarshal(marsh.marshal(inItf), null); + + assertEquals(outItf.checkAfterUnmarshalled(), 17); + } + + /** * Test duplicate fields. * * @throws Exception If failed. @@ -4676,4 +4705,25 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest { rawValArr = rawReader.readDecimalArray(); } } + + /** + * Wrapper object. + */ + private static class Wrapper { + + /** Value. */ + private final Object value; + + /** Constructor. */ + public Wrapper(Object value) { + this.value = value; + } + + /** + * @return Value. + */ + public Object getValue() { + return value; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/215e8a1e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsComputeGridTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsComputeGridTestSuite.java index 35be98d..e659966 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsComputeGridTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsComputeGridTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.GridComputationBinarylizableClosuresSelfTest; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.testframework.config.GridTestProperties; @@ -32,6 +33,10 @@ public class IgniteBinaryObjectsComputeGridTestSuite { public static TestSuite suite() throws Exception { GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName()); - return IgniteComputeGridTestSuite.suite(); + TestSuite suite = IgniteComputeGridTestSuite.suite(); + + suite.addTestSuite(GridComputationBinarylizableClosuresSelfTest.class); + + return suite; } }