Fix inconsistencies in cassandra-stress load balancing policy patch by Stefania Alborghetti; reviewed by Jake Luciani for CASSANDRA-12919
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/12c63cf4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/12c63cf4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/12c63cf4 Branch: refs/heads/cassandra-3.X Commit: 12c63cf4a44498674e5b23f011b9bd8a07f775c3 Parents: 595f1da Author: Stefania Alborghetti <stefania.alborghe...@datastax.com> Authored: Thu Nov 17 11:36:22 2016 +0800 Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com> Committed: Thu Nov 24 08:56:04 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/stress/util/JavaDriverClient.java | 33 +++++++++++--------- 2 files changed, 20 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/12c63cf4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 92bf1ce..9be3e7c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.11 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919) * Fix validation of non-frozen UDT cells (CASSANDRA-12916) * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934) http://git-wip-us.apache.org/repos/asf/cassandra/blob/12c63cf4/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java index 53d8786..e0b4262 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java @@ -25,6 +25,7 @@ import javax.net.ssl.SSLContext; import com.datastax.driver.core.*; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; import com.datastax.driver.core.policies.WhiteListPolicy; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.Slf4JLoggerFactory; @@ -70,19 +71,8 @@ public class JavaDriverClient this.password = settings.mode.password; this.authProvider = settings.mode.authProvider; this.encryptionOptions = encryptionOptions; - - DCAwareRoundRobinPolicy.Builder policyBuilder = DCAwareRoundRobinPolicy.builder(); - if (settings.node.datacenter != null) - policyBuilder.withLocalDc(settings.node.datacenter); - - if (settings.node.isWhiteList) - loadBalancingPolicy = new WhiteListPolicy(policyBuilder.build(), settings.node.resolveAll(settings.port.nativePort)); - else if (settings.node.datacenter != null) - loadBalancingPolicy = policyBuilder.build(); - else - loadBalancingPolicy = null; - - connectionsPerHost = settings.mode.connectionsPerHost == null ? 8 : settings.mode.connectionsPerHost; + this.loadBalancingPolicy = loadBalancingPolicy(settings); + this.connectionsPerHost = settings.mode.connectionsPerHost == null ? 8 : settings.mode.connectionsPerHost; int maxThreadCount = 0; if (settings.rate.auto) @@ -97,6 +87,22 @@ public class JavaDriverClient maxPendingPerConnection = settings.mode.maxPendingPerConnection == null ? Math.max(128, requestsPerConnection ) : settings.mode.maxPendingPerConnection; } + private LoadBalancingPolicy loadBalancingPolicy(StressSettings settings) + { + DCAwareRoundRobinPolicy.Builder policyBuilder = DCAwareRoundRobinPolicy.builder(); + if (settings.node.datacenter != null) + policyBuilder.withLocalDc(settings.node.datacenter); + + LoadBalancingPolicy ret = null; + if (settings.node.datacenter != null) + ret = policyBuilder.build(); + + if (settings.node.isWhiteList) + ret = new WhiteListPolicy(ret == null ? policyBuilder.build() : ret, settings.node.resolveAll(settings.port.nativePort)); + + return new TokenAwarePolicy(ret == null ? policyBuilder.build() : ret); + } + public PreparedStatement prepare(String query) { PreparedStatement stmt = stmts.get(query); @@ -185,7 +191,6 @@ public class JavaDriverClient public ResultSet executePrepared(PreparedStatement stmt, List<Object> queryParams, org.apache.cassandra.db.ConsistencyLevel consistency) { - stmt.setConsistencyLevel(from(consistency)); BoundStatement bstmt = stmt.bind((Object[]) queryParams.toArray(new Object[queryParams.size()])); return getSession().execute(bstmt);