Repository: ignite Updated Branches: refs/heads/ignite-5658 e9b851ae4 -> 7adcd6feb
ignite-5658 added pool sizes to attributes Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7adcd6fe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7adcd6fe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7adcd6fe Branch: refs/heads/ignite-5658 Commit: 7adcd6feb4db354b48b63efaafc26ea76940708e Parents: e9b851a Author: Yakov Zhdanov <[email protected]> Authored: Tue Aug 1 13:50:41 2017 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Tue Aug 1 13:50:41 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteDataStreamer.java | 2 +- .../apache/ignite/IgniteSystemProperties.java | 7 +++++ .../apache/ignite/internal/IgniteKernal.java | 5 ++++ .../ignite/internal/IgniteNodeAttributes.java | 6 +++++ .../datastreamer/DataStreamerImpl.java | 28 ++++++++++++++++---- 5 files changed, 42 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7adcd6fe/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java index 0e84e36..3b0a3d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -104,7 +104,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { public static final int DFLT_PARALLEL_OPS_MULTIPLIER = 20; /** Default per node buffer size. */ - public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024; + public static final int DFLT_PER_NODE_BUFFER_SIZE = 512; /** Default timeout for streamer's operations. */ public static final long DFLT_UNLIMIT_TIMEOUT = -1; http://git-wip-us.apache.org/repos/asf/ignite/blob/7adcd6fe/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 264fb4b..d3e64cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -697,6 +697,13 @@ public final class IgniteSystemProperties { public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = "IGNITE_ENABLE_FORCIBLE_NODE_KILL"; /** + * If this property is set data streamer will use striped pool for processing requests + * (default is {@code false}). + */ + public static final String IGNITE_DATA_STREAMER_USE_STRIPED_POOL_WHEN_ISOLATED = + "IGNITE_DATA_STREAMER_USE_STRIPED_POOL_WHEN_ISOLATED"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/7adcd6fe/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 5186409..88246ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -213,6 +213,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS; @@ -234,6 +235,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_RESTART_ENABLED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_PORT_RANGE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SPI_CLASS; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_STRIPED_POOL_SIZE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_USER_NAME; import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR; import static org.apache.ignite.internal.IgniteVersionUtils.BUILD_TSTAMP_STR; @@ -1426,6 +1428,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { */ @SuppressWarnings({"SuspiciousMethodCalls", "unchecked", "TypeMayBeWeakened"}) private void fillNodeAttributes(boolean notifyEnabled) throws IgniteCheckedException { + ctx.addNodeAttribute(ATTR_STRIPED_POOL_SIZE, configuration().getStripedPoolSize()); + ctx.addNodeAttribute(ATTR_DATA_STREAMER_POOL_SIZE, configuration().getDataStreamerThreadPoolSize()); + final String[] incProps = cfg.getIncludeProperties(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/7adcd6fe/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index a45f991..be5230d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -177,6 +177,12 @@ public final class IgniteNodeAttributes { /** Ignite security compatibility mode. */ public static final String ATTR_SECURITY_COMPATIBILITY_MODE = ATTR_PREFIX + ".security.compatibility.enabled"; + /** */ + public static final String ATTR_DATA_STREAMER_POOL_SIZE = ATTR_PREFIX + ".data.streamer.pool.size"; + + /** */ + public static final String ATTR_STRIPED_POOL_SIZE = ATTR_PREFIX + ".striped.pool.size"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7adcd6fe/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 9d56668..877c9f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -50,6 +50,7 @@ import org.apache.ignite.IgniteDataStreamerTimeoutException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; @@ -61,6 +62,7 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.managers.communication.GridIoMessage; @@ -114,6 +116,7 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.LongAdder8; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_STREAMER_USE_STRIPED_POOL_WHEN_ISOLATED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_DATASTREAM; @@ -129,6 +132,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** Amount of permissions should be available to continue new data processing. */ private static final int REMAP_SEMAPHORE_PERMISSIONS_COUNT = Integer.MAX_VALUE; + /** */ + private static final boolean USE_STRIPED_POOL_WHEN_ISOLATED = + IgniteSystemProperties.getBoolean(IGNITE_DATA_STREAMER_USE_STRIPED_POOL_WHEN_ISOLATED); + /** Cache receiver. */ private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER; @@ -1358,9 +1365,18 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed // Cache local node flag. isLocNode = node.equals(ctx.discovery().localNode()); + Integer attrStripedPoolSize = node.attribute(IgniteNodeAttributes.ATTR_STRIPED_POOL_SIZE); + + int stripedPoolSize = attrStripedPoolSize != null ? attrStripedPoolSize : node.metrics().getTotalCpus(); + + Integer attrStreamerPoolSize = node.attribute(IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE); + + int streamerPoolSize = attrStreamerPoolSize != null ? attrStreamerPoolSize : node.metrics().getTotalCpus(); + perNodeParallelOps = parallelOps != 0 ? parallelOps : - node.metrics().getTotalCpus() * DFLT_PARALLEL_OPS_MULTIPLIER; + (USE_STRIPED_POOL_WHEN_ISOLATED && rcvr == ISOLATED_UPDATER) ? + stripedPoolSize : streamerPoolSize; sem = new Semaphore(perNodeParallelOps); @@ -1679,7 +1695,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed cache.context().deploy().onEnter(); } catch (IgniteCheckedException e) { - U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e); + U.error(log, "Failed to deploy class (request will not be sent): " + + jobPda0.deployClass(), e); return; } @@ -1713,11 +1730,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed dep != null ? dep.classLoaderId() : null, dep == null, topVer, - rcvr == ISOLATED_UPDATER ? partId : GridIoMessage.STRIPE_DISABLED_PART); + (rcvr == ISOLATED_UPDATER && USE_STRIPED_POOL_WHEN_ISOLATED) ? + partId : GridIoMessage.STRIPE_DISABLED_PART); try { - ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req, plc); // TODO -// req.partition() == GridIoMessage.STRIPE_DISABLED_PART ? plc : GridIoPolicy.SYSTEM_POOL); + ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req, + req.partition() == GridIoMessage.STRIPE_DISABLED_PART ? plc : GridIoPolicy.SYSTEM_POOL); if (log.isDebugEnabled()) log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
