Restored services compatibility.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/92fff630 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/92fff630 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/92fff630 Branch: refs/heads/master Commit: 92fff630fbf36c82f93bbd9ddd53d11bed44e772 Parents: 61ab650 Author: devozerov <[email protected]> Authored: Wed Nov 2 17:50:51 2016 +0300 Committer: thatcoach <[email protected]> Committed: Wed Nov 2 17:51:06 2016 +0300 ---------------------------------------------------------------------- .../internal/processors/job/GridJobWorker.java | 10 +- .../service/GridServiceProcessor.java | 197 +++++++++++-------- .../internal/util/SerializableTransient.java | 58 ++++++ .../ignite/marshaller/MarshallerUtils.java | 22 +++ .../optimized/OptimizedClassDescriptor.java | 90 ++++++++- 5 files changed, 296 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/92fff630/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 8169eb1..5f38b29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -57,6 +57,7 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.marshaller.MarshallerUtils; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED; @@ -421,7 +422,14 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { try { if (job == null) { - job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config())); + MarshallerUtils.jobSenderVersion(taskNode.version()); + + try { + job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config())); + } + finally { + MarshallerUtils.jobSenderVersion(null); + } // No need to hold reference any more. jobBytes = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/92fff630/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 527d360..8489875 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.service; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -87,6 +90,7 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.internal.util.SerializableTransient; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.JobContextResource; import org.apache.ignite.resources.LoggerResource; @@ -115,6 +119,9 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** */ public static final IgniteProductVersion LAZY_SERVICES_CFG_SINCE = IgniteProductVersion.fromString("1.5.22"); + /** Versions that only compatible with each other, and from 1.5.33. */ + private static final Set<IgniteProductVersion> SERVICE_TOP_CALLABLE_VER1; + /** */ private final Boolean srvcCompatibilitySysProp; @@ -162,6 +169,31 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** Topology listener. */ private GridLocalEventListener topLsnr = new TopologyListener(); + static { + Set<IgniteProductVersion> versions = new TreeSet<>(new Comparator<IgniteProductVersion>() { + @Override public int compare(final IgniteProductVersion o1, final IgniteProductVersion o2) { + return o1.compareToIgnoreTimestamp(o2); + } + }); + + versions.add(IgniteProductVersion.fromString("1.5.30")); + versions.add(IgniteProductVersion.fromString("1.5.31")); + versions.add(IgniteProductVersion.fromString("1.5.32")); + versions.add(IgniteProductVersion.fromString("1.6.3")); + versions.add(IgniteProductVersion.fromString("1.6.4")); + versions.add(IgniteProductVersion.fromString("1.6.5")); + versions.add(IgniteProductVersion.fromString("1.6.6")); + versions.add(IgniteProductVersion.fromString("1.6.7")); + versions.add(IgniteProductVersion.fromString("1.6.8")); + versions.add(IgniteProductVersion.fromString("1.6.9")); + versions.add(IgniteProductVersion.fromString("1.6.10")); + versions.add(IgniteProductVersion.fromString("1.7.0")); + versions.add(IgniteProductVersion.fromString("1.7.1")); + versions.add(IgniteProductVersion.fromString("1.7.2")); + + SERVICE_TOP_CALLABLE_VER1 = Collections.unmodifiableSet(versions); + } + /** * @param ctx Kernal context. */ @@ -668,9 +700,13 @@ public class GridServiceProcessor extends GridProcessorAdapter { ClusterNode node = cache.affinity().mapKeyToNode(name); if (node.version().compareTo(ServiceTopologyCallable.SINCE_VER) >= 0) { + final ServiceTopologyCallable call = new ServiceTopologyCallable(name); + + call.serialize = SERVICE_TOP_CALLABLE_VER1.contains(node.version()); + return ctx.closure().callAsyncNoFailover( GridClosureCallMode.BROADCAST, - new ServiceTopologyCallable(name), + call, Collections.singletonList(node), false ).get(); @@ -815,7 +851,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } - return new GridServiceProxy<T>(prj, name, svcItf, sticky, ctx).proxy(); + return new GridServiceProxy<>(prj, name, svcItf, sticky, ctx).proxy(); } /** @@ -868,7 +904,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { * @param topVer Topology version. * @throws IgniteCheckedException If failed. */ - private void reassign(GridServiceDeployment dep, AffinityTopologyVersion topVer) throws IgniteCheckedException { + private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheckedException { ServiceConfiguration cfg = dep.configuration(); Object nodeFilter = cfg.getNodeFilter(); @@ -882,7 +918,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { Object affKey = cfg.getAffinityKey(); while (true) { - GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer.topologyVersion()); + GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer); Collection<ClusterNode> nodes; @@ -912,7 +948,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { Map<UUID, Integer> cnts = new HashMap<>(); if (affKey != null) { - ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, topVer); + ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, new AffinityTopologyVersion(topVer)); if (n != null) { int cnt = maxPerNodeCnt == 0 ? totalCnt == 0 ? 1 : totalCnt : maxPerNodeCnt; @@ -1144,7 +1180,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (cfg instanceof LazyServiceConfiguration) { byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes(); - Service srvc = U.unmarshal(m, bytes, U.resolveClassLoader(null, ctx.config())); + Service srvc = m.unmarshal(bytes, U.resolveClassLoader(null, ctx.config())); ctx.resource().inject(srvc); @@ -1154,9 +1190,10 @@ public class GridServiceProcessor extends GridProcessorAdapter { Service svc = cfg.getService(); try { - byte[] bytes = U.marshal(m, svc); + byte[] bytes = m.marshal(svc); - Service cp = U.unmarshal(m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config())); + Service cp = m.unmarshal(bytes, + U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config())); ctx.resource().inject(cp); @@ -1231,8 +1268,8 @@ public class GridServiceProcessor extends GridProcessorAdapter { ClusterNode oldestSrvNode = CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE); - if (oldestSrvNode == null) - return new GridEmptyIterator<>(); + if (oldestSrvNode == null) + return F.emptyIterator(); GridCacheQueryManager qryMgr = cache.context().queries(); @@ -1418,7 +1455,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { svcName.set(dep.configuration().getName()); // Ignore other utility cache events. - AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); + long topVer = ctx.discovery().topologyVersion(); ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null); @@ -1469,60 +1506,60 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } - /** - * Deployment callback. - * - * @param dep Service deployment. - * @param topVer Topology version. - */ - private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) { - // Retry forever. - try { - AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); + /** + * Deployment callback. + * + * @param dep Service deployment. + * @param topVer Topology version. + */ + private void onDeployment(final GridServiceDeployment dep, final long topVer) { + // Retry forever. + try { + long newTopVer = ctx.discovery().topologyVersion(); - // If topology version changed, reassignment will happen from topology event. - if (newTopVer.equals(topVer)) - reassign(dep, topVer); - } - catch (IgniteCheckedException e) { - if (!(e instanceof ClusterTopologyCheckedException)) - log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e); + // If topology version changed, reassignment will happen from topology event. + if (newTopVer == topVer) + reassign(dep, topVer); + } + catch (IgniteCheckedException e) { + if (!(e instanceof ClusterTopologyCheckedException)) + log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e); - AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); + long newTopVer = ctx.discovery().topologyVersion(); - if (!newTopVer.equals(topVer)) { - assert newTopVer.compareTo(topVer) > 0; + if (newTopVer != topVer) { + assert newTopVer > topVer; - // Reassignment will happen from topology event. - return; - } + // Reassignment will happen from topology event. + return; + } - ctx.timeout().addTimeoutObject(new GridTimeoutObject() { - private IgniteUuid id = IgniteUuid.randomUuid(); + ctx.timeout().addTimeoutObject(new GridTimeoutObject() { + private IgniteUuid id = IgniteUuid.randomUuid(); - private long start = System.currentTimeMillis(); + private long start = System.currentTimeMillis(); - @Override public IgniteUuid timeoutId() { - return id; - } + @Override public IgniteUuid timeoutId() { + return id; + } - @Override public long endTime() { - return start + RETRY_TIMEOUT; - } + @Override public long endTime() { + return start + RETRY_TIMEOUT; + } - @Override public void onTimeout() { - if (!busyLock.enterBusy()) - return; + @Override public void onTimeout() { + if (!busyLock.enterBusy()) + return; - try { - // Try again. - onDeployment(dep, topVer); - } - finally { - busyLock.leaveBusy(); - } + try { + // Try again. + onDeployment(dep, topVer); } - }); + finally { + busyLock.leaveBusy(); + } + } + }); } } @@ -1531,28 +1568,16 @@ public class GridServiceProcessor extends GridProcessorAdapter { */ private class TopologyListener implements GridLocalEventListener { /** {@inheritDoc} */ - @Override public void onEvent(Event evt) { + @Override public void onEvent(final Event evt) { if (!busyLock.enterBusy()) return; try { - final AffinityTopologyVersion topVer; - - if (evt instanceof DiscoveryCustomEvent) { - DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage(); - - topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion(); - - if (msg instanceof CacheAffinityChangeMessage) { - if (!((CacheAffinityChangeMessage)msg).exchangeNeeded()) - return; - } - } - else - topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0); - depExe.submit(new BusyRunnable() { @Override public void run0() { + AffinityTopologyVersion topVer = + new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion()); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer); if (oldest != null && oldest.isLocal()) { @@ -1587,7 +1612,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { ctx.cache().internalCache(UTILITY_CACHE_NAME).context().affinity(). affinityReadyFuture(topVer).get(); - reassign(dep, topVer); + reassign(dep, topVer.topologyVersion()); } catch (IgniteCheckedException ex) { if (!(e instanceof ClusterTopologyCheckedException)) @@ -1604,7 +1629,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { } if (!retries.isEmpty()) - onReassignmentFailed(topVer, retries); + onReassignmentFailed(topVer.topologyVersion(), retries); } // Clean up zombie assignments. @@ -1641,14 +1666,13 @@ public class GridServiceProcessor extends GridProcessorAdapter { * @param topVer Topology version. * @param retries Retries. */ - private void onReassignmentFailed(final AffinityTopologyVersion topVer, - final Collection<GridServiceDeployment> retries) { + private void onReassignmentFailed(final long topVer, final Collection<GridServiceDeployment> retries) { if (!busyLock.enterBusy()) return; try { // If topology changed again, let next event handle it. - if (ctx.discovery().topologyVersionEx().equals(topVer)) + if (ctx.discovery().topologyVersion() != topVer) return; for (Iterator<GridServiceDeployment> it = retries.iterator(); it.hasNext(); ) { @@ -1829,6 +1853,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** */ @GridInternal + @SerializableTransient(methodName = "serializableTransient") private static class ServiceTopologyCallable implements IgniteCallable<Map<UUID, Integer>> { /** */ private static final long serialVersionUID = 0L; @@ -1837,10 +1862,13 @@ public class GridServiceProcessor extends GridProcessorAdapter { private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.7"); /** */ + private static final String[] SER_FIELDS = {"waitedCacheInit", "jCtx", "log"}; + + /** */ private final String svcName; /** */ - private boolean waitedCacheInit; + private transient boolean waitedCacheInit; /** */ @IgniteInstanceResource @@ -1848,11 +1876,14 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** */ @JobContextResource - private ComputeJobContext jCtx; + private transient ComputeJobContext jCtx; /** */ @LoggerResource - private IgniteLogger log; + private transient IgniteLogger log; + + /** */ + transient boolean serialize; /** * @param svcName Service name. @@ -1898,6 +1929,16 @@ public class GridServiceProcessor extends GridProcessorAdapter { return serviceTopology(cache, svcName); } + + /** + * @param self Instance of current class before serialization. + * @param ver Sender job version. + * @return List of serializable transient fields. + */ + @SuppressWarnings("unused") + private static String[] serializableTransient(ServiceTopologyCallable self, IgniteProductVersion ver) { + return (self != null && self.serialize) || (ver != null && SERVICE_TOP_CALLABLE_VER1.contains(ver)) ? SER_FIELDS : null; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/92fff630/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java new file mode 100644 index 0000000..14a2f27 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import org.apache.ignite.lang.IgniteProductVersion; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marks class as it has transient fields that should be serialized. + * Annotated class must have method that returns list of transient + * fields that should be serialized. + * <p> + * Works only for jobs. For other messages node version is not available. + * </p> + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface SerializableTransient { + /** + * Name of the private static method that returns list of transient fields + * that should be serialized (String[]), and accepts itself (before serialization) + * and {@link IgniteProductVersion}, e.g. + * <pre> + * private static String[] fields(Object self, IgniteProductVersion ver){ + * return ver.compareTo("1.5.30") > 0 ? SERIALIZABLE_FIELDS : null; + * } + * </pre> + * <p> + * On serialization version argument <tt>ver</tt> is null, on deserialization - <tt>self</tt> is null. + * </p> + * <p> + * If it returns empty array or null all transient fields will be normally + * ignored. + * </p> + * + * @return Name of the method. + */ + String methodName(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/92fff630/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java index 9668baf..ad63702 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java @@ -17,6 +17,7 @@ package org.apache.ignite.marshaller; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.jetbrains.annotations.Nullable; @@ -24,6 +25,9 @@ import org.jetbrains.annotations.Nullable; * Utility marshaller methods. */ public class MarshallerUtils { + /** Job sender node version. */ + private static final ThreadLocal<IgniteProductVersion> JOB_SND_NODE_VER = new ThreadLocal<>(); + /** * Set node name to marshaller context if possible. * @@ -55,4 +59,22 @@ public class MarshallerUtils { private MarshallerUtils() { // No-op. } + + /** + * Sets thread local job sender node version. + * + * @param ver Thread local job sender node version. + */ + public static void jobSenderVersion(IgniteProductVersion ver) { + JOB_SND_NODE_VER.set(ver); + } + + /** + * Returns thread local job sender node version. + * + * @return Thread local job sender node version. + */ + public static IgniteProductVersion jobSenderVersion() { + return JOB_SND_NODE_VER.get(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/92fff630/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java index 5a5b54d..160f2c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java @@ -47,8 +47,11 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.marshaller.MarshallerContext; import org.apache.ignite.marshaller.MarshallerExclusions; +import org.apache.ignite.internal.util.SerializableTransient; +import org.apache.ignite.marshaller.MarshallerUtils; import static java.lang.reflect.Modifier.isFinal; import static java.lang.reflect.Modifier.isPrivate; @@ -166,6 +169,9 @@ class OptimizedClassDescriptor { /** Proxy interfaces. */ private Class<?>[] proxyIntfs; + /** Method returns serializable transient fields. */ + private Method serTransMtd; + /** * Creates descriptor for class. * @@ -441,6 +447,27 @@ class OptimizedClassDescriptor { readObjMtds.add(mtd); + final SerializableTransient serTransAn = c.getAnnotation(SerializableTransient.class); + + // Custom serialization policy for transient fields. + if (serTransAn != null) { + try { + serTransMtd = c.getDeclaredMethod(serTransAn.methodName(), cls, IgniteProductVersion.class); + + int mod = serTransMtd.getModifiers(); + + if (isStatic(mod) && isPrivate(mod) + && serTransMtd.getReturnType() == String[].class) + serTransMtd.setAccessible(true); + else + // Set method back to null if it has incorrect signature. + serTransMtd = null; + } + catch (NoSuchMethodException ignored) { + serTransMtd = null; + } + } + Field[] clsFields0 = c.getDeclaredFields(); Map<String, Field> fieldNames = new HashMap<>(); @@ -797,7 +824,7 @@ class OptimizedClassDescriptor { writeTypeData(out); out.writeShort(checksum); - out.writeSerializable(obj, writeObjMtds, fields); + out.writeSerializable(obj, writeObjMtds, serializableFields(obj.getClass(), obj, null)); break; @@ -807,6 +834,60 @@ class OptimizedClassDescriptor { } /** + * Gets list of serializable fields. If {@link #serTransMtd} method + * returns list of transient fields, they will be added to other fields. + * Transient fields that are not included in that list will be normally + * ignored. + * + * @param cls Class. + * @param obj Object. + * @param ver Job sender version. + * @return Serializable fields. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private Fields serializableFields(Class<?> cls, Object obj, IgniteProductVersion ver) { + if (serTransMtd == null) + return fields; + + try { + final String[] transFields = (String[])serTransMtd.invoke(cls, obj, ver); + + if (transFields == null || transFields.length == 0) + return fields; + + List<FieldInfo> clsFields = new ArrayList<>(); + + clsFields.addAll(fields.fields.get(0).fields); + + for (int i = 0; i < transFields.length; i++) { + final String fieldName = transFields[i]; + + final Field f = cls.getDeclaredField(fieldName); + + FieldInfo fieldInfo = new FieldInfo(f, f.getName(), + GridUnsafe.objectFieldOffset(f), fieldType(f.getType())); + + clsFields.add(fieldInfo); + } + + Collections.sort(clsFields, new Comparator<FieldInfo>() { + @Override public int compare(FieldInfo t1, FieldInfo t2) { + return t1.name().compareTo(t2.name()); + } + }); + + List<ClassFields> fields = new ArrayList<>(); + + fields.add(new ClassFields(clsFields)); + + return new Fields(fields); + } + catch (Exception e) { + return fields; + } + } + + /** * @param out Output stream. * @throws IOException In case of error. */ @@ -838,7 +919,12 @@ class OptimizedClassDescriptor { case SERIALIZABLE: verifyChecksum(in.readShort()); - return in.readSerializable(cls, readObjMtds, readResolveMtd, fields); + // If no serialize method, then unmarshal as usual. + if (serTransMtd != null) + return in.readSerializable(cls, readObjMtds, readResolveMtd, + serializableFields(cls, null, MarshallerUtils.jobSenderVersion())); + else + return in.readSerializable(cls, readObjMtds, readResolveMtd, fields); default: assert false : "Unexpected type: " + type;
