Repository: ignite
Updated Branches:
  refs/heads/ignite-5658 e9b851ae4 -> 7adcd6feb


ignite-5658 added pool sizes to attributes


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7adcd6fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7adcd6fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7adcd6fe

Branch: refs/heads/ignite-5658
Commit: 7adcd6feb4db354b48b63efaafc26ea76940708e
Parents: e9b851a
Author: Yakov Zhdanov <yzhda...@gridgain.com>
Authored: Tue Aug 1 13:50:41 2017 +0300
Committer: Yakov Zhdanov <yzhda...@gridgain.com>
Committed: Tue Aug 1 13:50:41 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/IgniteDataStreamer.java   |  2 +-
 .../apache/ignite/IgniteSystemProperties.java   |  7 +++++
 .../apache/ignite/internal/IgniteKernal.java    |  5 ++++
 .../ignite/internal/IgniteNodeAttributes.java   |  6 +++++
 .../datastreamer/DataStreamerImpl.java          | 28 ++++++++++++++++----
 5 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7adcd6fe/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index 0e84e36..3b0a3d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -104,7 +104,7 @@ public interface IgniteDataStreamer<K, V> extends 
AutoCloseable {
     public static final int DFLT_PARALLEL_OPS_MULTIPLIER = 20;
 
     /** Default per node buffer size. */
-    public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
+    public static final int DFLT_PER_NODE_BUFFER_SIZE = 512;
 
     /** Default timeout for streamer's operations. */
     public static final long DFLT_UNLIMIT_TIMEOUT = -1;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7adcd6fe/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 264fb4b..d3e64cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -697,6 +697,13 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = 
"IGNITE_ENABLE_FORCIBLE_NODE_KILL";
 
     /**
+     * If this property is set data streamer will use striped pool for 
processing requests
+     * (default is {@code false}).
+     */
+    public static final String 
IGNITE_DATA_STREAMER_USE_STRIPED_POOL_WHEN_ISOLATED =
+        "IGNITE_DATA_STREAMER_USE_STRIPED_POOL_WHEN_ISOLATED";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7adcd6fe/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 5186409..88246ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -213,6 +213,7 @@ import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
@@ -234,6 +235,7 @@ import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_RESTART_ENABLED;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_PORT_RANGE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SPI_CLASS;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_STRIPED_POOL_SIZE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_USER_NAME;
 import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
 import static org.apache.ignite.internal.IgniteVersionUtils.BUILD_TSTAMP_STR;
@@ -1426,6 +1428,9 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
      */
     @SuppressWarnings({"SuspiciousMethodCalls", "unchecked", 
"TypeMayBeWeakened"})
     private void fillNodeAttributes(boolean notifyEnabled) throws 
IgniteCheckedException {
+        ctx.addNodeAttribute(ATTR_STRIPED_POOL_SIZE, 
configuration().getStripedPoolSize());
+        ctx.addNodeAttribute(ATTR_DATA_STREAMER_POOL_SIZE, 
configuration().getDataStreamerThreadPoolSize());
+
         final String[] incProps = cfg.getIncludeProperties();
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7adcd6fe/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index a45f991..be5230d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -177,6 +177,12 @@ public final class IgniteNodeAttributes {
     /** Ignite security compatibility mode. */
     public static final String ATTR_SECURITY_COMPATIBILITY_MODE = ATTR_PREFIX 
+ ".security.compatibility.enabled";
 
+    /** */
+    public static final String ATTR_DATA_STREAMER_POOL_SIZE = ATTR_PREFIX + 
".data.streamer.pool.size";
+
+    /** */
+    public static final String ATTR_STRIPED_POOL_SIZE = ATTR_PREFIX + 
".striped.pool.size";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7adcd6fe/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 9d56668..877c9f4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -50,6 +50,7 @@ import org.apache.ignite.IgniteDataStreamerTimeoutException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -61,6 +62,7 @@ import 
org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
@@ -114,6 +116,7 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.LongAdder8;
 
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_STREAMER_USE_STRIPED_POOL_WHEN_ISOLATED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM;
@@ -129,6 +132,10 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     /** Amount of permissions should be available to continue new data 
processing. */
     private static final int REMAP_SEMAPHORE_PERMISSIONS_COUNT = 
Integer.MAX_VALUE;
 
+    /** */
+    private static final boolean USE_STRIPED_POOL_WHEN_ISOLATED =
+        
IgniteSystemProperties.getBoolean(IGNITE_DATA_STREAMER_USE_STRIPED_POOL_WHEN_ISOLATED);
+
     /** Cache receiver. */
     private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;
 
@@ -1358,9 +1365,18 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             // Cache local node flag.
             isLocNode = node.equals(ctx.discovery().localNode());
 
+            Integer attrStripedPoolSize = 
node.attribute(IgniteNodeAttributes.ATTR_STRIPED_POOL_SIZE);
+
+            int stripedPoolSize = attrStripedPoolSize != null ? 
attrStripedPoolSize : node.metrics().getTotalCpus();
+
+            Integer attrStreamerPoolSize = 
node.attribute(IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE);
+
+            int streamerPoolSize = attrStreamerPoolSize != null ? 
attrStreamerPoolSize : node.metrics().getTotalCpus();
+
             perNodeParallelOps = parallelOps != 0 ?
                 parallelOps :
-                node.metrics().getTotalCpus() * DFLT_PARALLEL_OPS_MULTIPLIER;
+                (USE_STRIPED_POOL_WHEN_ISOLATED && rcvr == ISOLATED_UPDATER) ?
+                    stripedPoolSize : streamerPoolSize;
 
             sem = new Semaphore(perNodeParallelOps);
 
@@ -1679,7 +1695,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                             cache.context().deploy().onEnter();
                     }
                     catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to deploy class (request will not 
be sent): " + jobPda0.deployClass(), e);
+                        U.error(log, "Failed to deploy class (request will not 
be sent): " +
+                            jobPda0.deployClass(), e);
 
                         return;
                     }
@@ -1713,11 +1730,12 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                     dep != null ? dep.classLoaderId() : null,
                     dep == null,
                     topVer,
-                    rcvr == ISOLATED_UPDATER ? partId : 
GridIoMessage.STRIPE_DISABLED_PART);
+                    (rcvr == ISOLATED_UPDATER && 
USE_STRIPED_POOL_WHEN_ISOLATED) ?
+                        partId : GridIoMessage.STRIPE_DISABLED_PART);
 
                 try {
-                    ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req, 
plc); // TODO
-//                        req.partition() == 
GridIoMessage.STRIPE_DISABLED_PART ? plc : GridIoPolicy.SYSTEM_POOL);
+                    ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req,
+                        req.partition() == GridIoMessage.STRIPE_DISABLED_PART 
? plc : GridIoPolicy.SYSTEM_POOL);
 
                     if (log.isDebugEnabled())
                         log.debug("Sent request to node [nodeId=" + node.id() 
+ ", req=" + req + ']');

Reply via email to