Repository: ignite
Updated Branches:
  refs/heads/master 4c56adcf0 -> ececcbcd1


IGNITE-10564 Added tasks to collect rebalance metrics.

Co-authored-by: Alexey Kuznetsov <akuznet...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ececcbcd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ececcbcd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ececcbcd

Branch: refs/heads/master
Commit: ececcbcd1883a2cda5a2eb12e0f0deffddf6b829
Parents: 4c56adc
Author: Vasiliy Sisko <vsi...@gridgain.com>
Authored: Thu Dec 6 11:30:08 2018 +0700
Committer: Alexey Kuznetsov <akuznet...@apache.org>
Committed: Thu Dec 6 11:30:08 2018 +0700

----------------------------------------------------------------------
 .../VisorCacheRebalanceCollectorJobResult.java  |  91 +++++++++
 .../node/VisorCacheRebalanceCollectorTask.java  | 194 +++++++++++++++++++
 .../VisorCacheRebalanceCollectorTaskArg.java    |  54 ++++++
 .../VisorCacheRebalanceCollectorTaskResult.java |  92 +++++++++
 .../visor/node/VisorNodeBaselineStatus.java     |  45 +++++
 .../visor/node/VisorNodeDataCollectorJob.java   |  74 ++++---
 .../node/VisorNodeDataCollectorTaskResult.java  |   3 +-
 .../internal/visor/util/VisorTaskUtils.java     |  43 ++++
 8 files changed, 564 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorJobResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorJobResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorJobResult.java
new file mode 100644
index 0000000..5bd818d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorJobResult.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.node;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Result object for cache rebalance job.
+ */
+public class VisorCacheRebalanceCollectorJobResult extends 
IgniteDataTransferObject {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Rebalance percent. */
+    private double rebalance;
+
+    /** Node baseline state. */
+    private VisorNodeBaselineStatus baseline;
+
+    /**
+     * Default constructor.
+     */
+    public VisorCacheRebalanceCollectorJobResult() {
+        // No-op.
+    }
+
+    /**
+     * @return Rebalance progress.
+     */
+    public double getRebalance() {
+        return rebalance;
+    }
+
+    /**
+     * @param rebalance Rebalance progress.
+     */
+    public void setRebalance(double rebalance) {
+        this.rebalance = rebalance;
+    }
+
+    /**
+     * @return Node baseline status.
+     */
+    public VisorNodeBaselineStatus getBaseline() {
+        return baseline;
+    }
+
+    /**
+     * @param baseline Node baseline status.
+     */
+    public void setBaseline(VisorNodeBaselineStatus baseline) {
+        this.baseline = baseline;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
+        out.writeDouble(rebalance);
+        U.writeEnum(out, baseline);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readExternalData(byte protoVer, ObjectInput in) 
throws IOException, ClassNotFoundException {
+        rebalance = in.readDouble();
+        baseline = VisorNodeBaselineStatus.fromOrdinal(in.readByte());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(VisorCacheRebalanceCollectorJobResult.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTask.java
new file mode 100644
index 0000000..eda9f94
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTask.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.node;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.visor.node.VisorNodeBaselineStatus.BASELINE_NOT_AVAILABLE;
+import static 
org.apache.ignite.internal.visor.node.VisorNodeBaselineStatus.NODE_IN_BASELINE;
+import static 
org.apache.ignite.internal.visor.node.VisorNodeBaselineStatus.NODE_NOT_IN_BASELINE;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.MINIMAL_REBALANCE;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.NOTHING_TO_REBALANCE;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.REBALANCE_COMPLETE;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.REBALANCE_NOT_AVAILABLE;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.isProxyCache;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.isRestartingCache;
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log;
+
+/**
+ * Collects topology rebalance metrics.
+ */
+@GridInternal
+public class VisorCacheRebalanceCollectorTask extends 
VisorMultiNodeTask<VisorCacheRebalanceCollectorTaskArg,
+    VisorCacheRebalanceCollectorTaskResult, 
VisorCacheRebalanceCollectorJobResult> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorCacheRebalanceCollectorJob 
job(VisorCacheRebalanceCollectorTaskArg arg) {
+        return new VisorCacheRebalanceCollectorJob(arg, debug);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected VisorCacheRebalanceCollectorTaskResult 
reduce0(List<ComputeJobResult> results) {
+        return reduce(new VisorCacheRebalanceCollectorTaskResult(), results);
+    }
+
+    /**
+     * @param taskRes Task result.
+     * @param results Results.
+     * @return Topology rebalance metrics collector task result.
+     */
+    protected VisorCacheRebalanceCollectorTaskResult reduce(
+        VisorCacheRebalanceCollectorTaskResult taskRes,
+        List<ComputeJobResult> results
+    ) {
+        for (ComputeJobResult res : results) {
+            VisorCacheRebalanceCollectorJobResult jobRes = res.getData();
+
+            if (jobRes != null) {
+                if (res.getException() == null)
+                    taskRes.getRebalance().put(res.getNode().id(), 
jobRes.getRebalance());
+
+                taskRes.getBaseline().put(res.getNode().id(), 
jobRes.getBaseline());
+            }
+        }
+
+        return taskRes;
+    }
+
+    /**
+     * Job that collects rebalance metrics.
+     */
+    private static class VisorCacheRebalanceCollectorJob extends 
VisorJob<VisorCacheRebalanceCollectorTaskArg, 
VisorCacheRebalanceCollectorJobResult> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Create job with given argument.
+         *
+         * @param arg Job argument.
+         * @param debug Debug flag.
+         */
+        private 
VisorCacheRebalanceCollectorJob(VisorCacheRebalanceCollectorTaskArg arg, 
boolean debug) {
+            super(arg, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected VisorCacheRebalanceCollectorJobResult 
run(VisorCacheRebalanceCollectorTaskArg arg) {
+            VisorCacheRebalanceCollectorJobResult res = new 
VisorCacheRebalanceCollectorJobResult();
+
+            long start0 = U.currentTimeMillis();
+
+            try {
+                int partitions = 0;
+                double total = 0;
+                double ready = 0;
+
+                GridCacheProcessor cacheProc = ignite.context().cache();
+
+                boolean rebalanceInProgress = false;
+
+                for (CacheGroupContext grp: cacheProc.cacheGroups()) {
+                    String cacheName = grp.config().getName();
+
+                    if (isProxyCache(ignite, cacheName) || 
isRestartingCache(ignite, cacheName))
+                        continue;
+
+                    try {
+                        GridCacheAdapter ca = 
cacheProc.internalCache(cacheName);
+
+                        if (ca == null || !ca.context().started())
+                            continue;
+
+                        CacheMetrics cm = ca.localMetrics();
+
+                        partitions += cm.getTotalPartitionsCount();
+
+                        long keysTotal = cm.getEstimatedRebalancingKeys();
+                        long keysReady = cm.getRebalancedKeys();
+
+                        if (keysReady >= keysTotal)
+                            keysReady = Math.max(keysTotal - 1, 0);
+
+                        total += keysTotal;
+                        ready += keysReady;
+
+                        if (cm.getRebalancingPartitionsCount() > 0)
+                            rebalanceInProgress = true;
+                    }
+                    catch(IllegalStateException | IllegalArgumentException e) {
+                        if (debug && ignite.log() != null)
+                            ignite.log().error("Ignored cache group: " + 
grp.cacheOrGroupName(), e);
+                    }
+                }
+
+                if (partitions == 0)
+                    res.setRebalance(NOTHING_TO_REBALANCE);
+                else if (total == 0 && rebalanceInProgress)
+                    res.setRebalance(MINIMAL_REBALANCE);
+                else
+                    res.setRebalance(total > 0 ? Math.max(ready / total, 
MINIMAL_REBALANCE) : REBALANCE_COMPLETE);
+            }
+            catch (Exception e) {
+                res.setRebalance(REBALANCE_NOT_AVAILABLE);
+
+                ignite.log().error("Failed to collect rebalance metrics", e);
+            }
+
+            if (GridCacheUtils.isPersistenceEnabled(ignite.configuration())) {
+                IgniteClusterEx cluster = ignite.cluster();
+
+                Object consistentId = ignite.localNode().consistentId();
+
+                Collection<? extends BaselineNode> baseline = 
cluster.currentBaselineTopology();
+
+                boolean inBaseline = baseline.stream().anyMatch(n -> 
consistentId.equals(n.consistentId()));
+
+                res.setBaseline(inBaseline ? NODE_IN_BASELINE : 
NODE_NOT_IN_BASELINE);
+            }
+            else
+                res.setBaseline(BASELINE_NOT_AVAILABLE);
+
+            if (debug)
+                log(ignite.log(), "Collected rebalance metrics", getClass(), 
start0);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(VisorCacheRebalanceCollectorJob.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskArg.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskArg.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskArg.java
new file mode 100644
index 0000000..d97fd50
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskArg.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.node;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.visor.VisorDataTransferObject;
+
+/**
+ * Argument for {@link VisorCacheRebalanceCollectorTask} task.
+ */
+public class VisorCacheRebalanceCollectorTaskArg extends 
VisorDataTransferObject {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Default constructor.
+     */
+    public VisorCacheRebalanceCollectorTaskArg() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readExternalData(byte protoVer, ObjectInput in) 
throws IOException, ClassNotFoundException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(VisorCacheRebalanceCollectorTaskArg.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskResult.java
new file mode 100644
index 0000000..1305cd2
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorCacheRebalanceCollectorTaskResult.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.node;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Result object for {@link VisorCacheRebalanceCollectorTask} task.
+ */
+public class VisorCacheRebalanceCollectorTaskResult extends 
IgniteDataTransferObject {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Rebalance state on nodes. */
+    private Map<UUID, Double> rebalance = new HashMap<>();
+
+    /** Nodes baseline status. */
+    private Map<UUID, VisorNodeBaselineStatus> baseline = new HashMap<>();
+
+    /**
+     * Default constructor.
+     */
+    public VisorCacheRebalanceCollectorTaskResult() {
+        // No-op.
+    }
+
+    /**
+     * @return Rebalance on nodes.
+     */
+    public Map<UUID, Double> getRebalance() {
+        return rebalance;
+    }
+
+    /**
+     * @return Baseline.
+     */
+    public Map<UUID, VisorNodeBaselineStatus> getBaseline() {
+        return baseline;
+    }
+
+    /**
+     * Add specified results.
+     *
+     * @param res Results to add.
+     */
+    public void add(VisorCacheRebalanceCollectorTaskResult res) {
+        assert res != null;
+
+        rebalance.putAll(res.getRebalance());
+        baseline.putAll(res.getBaseline());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
+        U.writeMap(out, rebalance);
+        U.writeMap(out, baseline);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readExternalData(byte protoVer, ObjectInput in) 
throws IOException, ClassNotFoundException {
+        rebalance = U.readMap(in);
+        baseline = U.readMap(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(VisorCacheRebalanceCollectorTaskResult.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java
new file mode 100644
index 0000000..ea90be3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.node;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Node baseline status.
+ */
+public enum VisorNodeBaselineStatus {
+    /** */
+    NODE_IN_BASELINE,
+    /** */
+    NODE_NOT_IN_BASELINE,
+    /** */
+    BASELINE_NOT_AVAILABLE;
+
+    /** Enumerated values. */
+    private static final VisorNodeBaselineStatus[] VALS = values();
+
+    /**
+     * Efficiently gets enumerated value from its ordinal.
+     *
+     * @param ord Ordinal value.
+     * @return Enumerated value or {@code null} if ordinal out of range.
+     */
+    @Nullable public static VisorNodeBaselineStatus fromOrdinal(int ord) {
+        return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index 9025ed0..9a7d2b1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -28,8 +28,9 @@ import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
@@ -50,9 +51,15 @@ import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.isIgfsC
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.isSystemCache;
 import static 
org.apache.ignite.internal.visor.compute.VisorComputeMonitoringHolder.COMPUTE_MONITORING_HOLDER_KEY;
 import static org.apache.ignite.internal.visor.util.VisorTaskUtils.EVT_MAPPER;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.MINIMAL_REBALANCE;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.NOTHING_TO_REBALANCE;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.REBALANCE_COMPLETE;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.REBALANCE_NOT_AVAILABLE;
 import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.VISOR_TASK_EVTS;
 import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.checkExplicitTaskMonitoring;
 import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.collectEvents;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.isProxyCache;
+import static 
org.apache.ignite.internal.visor.util.VisorTaskUtils.isRestartingCache;
 import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log;
 
 /**
@@ -141,18 +148,6 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
     }
 
     /**
-     * @param cacheName Cache name to check.
-     * @return {@code true} if cache on local node is not a data cache or near 
cache disabled.
-     */
-    private boolean proxyCache(String cacheName) {
-        GridDiscoveryManager discovery = ignite.context().discovery();
-
-        ClusterNode locNode = ignite.localNode();
-
-        return !(discovery.cacheAffinityNode(locNode, cacheName) || 
discovery.cacheNearNode(locNode, cacheName));
-    }
-
-    /**
      * Collect memory metrics.
      *
      * @param res Job result.
@@ -194,38 +189,51 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
 
             List<VisorCache> resCaches = res.getCaches();
 
-            for (String cacheName : cacheProc.cacheNames()) {
-                if (proxyCache(cacheName))
-                    continue;
+            boolean rebalanceInProgress = false;
 
-                boolean sysCache = isSystemCache(cacheName);
+            for (CacheGroupContext grp : cacheProc.cacheGroups()) {
+                boolean first = true;
 
-                if (arg.getSystemCaches() || !(sysCache || isIgfsCache(cfg, 
cacheName))) {
+                for (GridCacheContext cache : grp.caches()) {
                     long start0 = U.currentTimeMillis();
 
+                    String cacheName = cache.name();
+
                     try {
+                        if (isProxyCache(ignite, cacheName) || 
isRestartingCache(ignite, cacheName))
+                            continue;
+
                         GridCacheAdapter ca = 
cacheProc.internalCache(cacheName);
 
                         if (ca == null || !ca.context().started())
                             continue;
 
-                        CacheMetrics cm = ca.localMetrics();
+                        if (first) {
+                            CacheMetrics cm = ca.localMetrics();
+
+                            partitions += cm.getTotalPartitionsCount();
+
+                            long keysTotal = cm.getEstimatedRebalancingKeys();
+                            long keysReady = cm.getRebalancedKeys();
+
+                            if (keysReady >= keysTotal)
+                                keysReady = Math.max(keysTotal - 1, 0);
 
-                        partitions += cm.getTotalPartitionsCount();
+                            total += keysTotal;
+                            ready += keysReady;
 
-                        long partTotal = cm.getEstimatedRebalancingKeys();
-                        long partReady = cm.getRebalancedKeys();
+                            if (!rebalanceInProgress && 
cm.getRebalancingPartitionsCount() > 0)
+                                rebalanceInProgress = true;
 
-                        if (partReady >= partTotal)
-                            partReady = Math.max(partTotal - 1, 0);
+                            first = false;
+                        }
 
-                        total += partTotal;
-                        ready += partReady;
+                        boolean addToRes = arg.getSystemCaches() || 
!(isSystemCache(cacheName) || isIgfsCache(cfg, cacheName));
 
-                        if (all || 
cacheGrps.contains(ca.configuration().getGroupName()))
+                        if (addToRes && (all || 
cacheGrps.contains(ca.configuration().getGroupName())))
                             resCaches.add(new VisorCache(ignite, ca, 
arg.isCollectCacheMetrics()));
                     }
-                    catch(IllegalStateException | IllegalArgumentException e) {
+                    catch (IllegalStateException | IllegalArgumentException e) 
{
                         if (debug && ignite.log() != null)
                             ignite.log().error("Ignored cache: " + cacheName, 
e);
                     }
@@ -237,11 +245,14 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
             }
 
             if (partitions == 0)
-                res.setRebalance(-1);
+                res.setRebalance(NOTHING_TO_REBALANCE);
+            else if (total == 0 && rebalanceInProgress)
+                res.setRebalance(MINIMAL_REBALANCE);
             else
-                res.setRebalance(total > 0 ? ready / total : 1);
+                res.setRebalance(total > 0 ? Math.max(ready / total, 
MINIMAL_REBALANCE) : REBALANCE_COMPLETE);
         }
         catch (Exception e) {
+            res.setRebalance(REBALANCE_NOT_AVAILABLE);
             res.setCachesEx(new VisorExceptionWrapper(e));
         }
     }
@@ -260,7 +271,8 @@ public class VisorNodeDataCollectorJob extends 
VisorJob<VisorNodeDataCollectorTa
 
                 FileSystemConfiguration igfsCfg = igfs.configuration();
 
-                if (proxyCache(igfsCfg.getDataCacheConfiguration().getName()) 
|| proxyCache(igfsCfg.getMetaCacheConfiguration().getName()))
+                if (isProxyCache(ignite, 
igfsCfg.getDataCacheConfiguration().getName()) ||
+                    isProxyCache(ignite, 
igfsCfg.getMetaCacheConfiguration().getName()))
                     continue;
 
                 try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
index eb161f8..f8eb869 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
@@ -131,7 +131,8 @@ public class VisorNodeDataCollectorTaskResult extends 
VisorDataTransferObject {
             readyTopVers.isEmpty() &&
             pendingExchanges.isEmpty() &&
             persistenceMetrics.isEmpty() &&
-            persistenceMetricsEx.isEmpty();
+            persistenceMetricsEx.isEmpty() &&
+            rebalance.isEmpty();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ececcbcd/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
index fda9ba1..7ab1ffc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
@@ -58,6 +58,10 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.eviction.AbstractEvictionPolicyFactory;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl;
 import org.apache.ignite.internal.processors.igfs.IgfsEx;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -110,6 +114,19 @@ public class VisorTaskUtils {
     public static final int LOG_FILES_COUNT_LIMIT = 5000;
 
     /** */
+    public static final int NOTHING_TO_REBALANCE = -1;
+
+    /** */
+    public static final int REBALANCE_NOT_AVAILABLE = -2;
+
+    /** */
+    public static final double MINIMAL_REBALANCE = 0.01;
+
+    /** */
+    public static final int REBALANCE_COMPLETE = 1;
+
+
+    /** */
     private static final int DFLT_BUFFER_SIZE = 4096;
 
     /** Only task event types that Visor should collect. */
@@ -1248,4 +1265,30 @@ public class VisorTaskUtils {
 
         return Arrays.asList(addrs);
     }
+
+    /**
+     * @param ignite Ignite.
+     * @param cacheName Cache name to check.
+     * @return {@code true} if cache on local node is not a data cache or near 
cache disabled.
+     */
+    public static boolean isProxyCache(IgniteEx ignite, String cacheName) {
+        GridDiscoveryManager discovery = ignite.context().discovery();
+
+        ClusterNode locNode = ignite.localNode();
+
+        return !(discovery.cacheAffinityNode(locNode, cacheName) || 
discovery.cacheNearNode(locNode, cacheName));
+    }
+
+    /**
+     * Check whether cache restarting in progress.
+     *
+     * @param ignite Grid.
+     * @param cacheName Cache name to check.
+     * @return {@code true} when cache restarting in progress.
+     */
+    public static boolean isRestartingCache(IgniteEx ignite, String cacheName) 
 {
+        IgniteCacheProxy<Object, Object> proxy = 
ignite.context().cache().jcache(cacheName);
+
+        return proxy instanceof IgniteCacheProxyImpl && 
((IgniteCacheProxyImpl) proxy).isRestarting();
+    }
 }

Reply via email to