Thanks Ryan, appreciated again. getPolicy just had this:

Policy policy = new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build());

so i guess i need 

Policy policy = new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build(), 
false);

Frank

On 2017-03-15 13:45 (-0000), Ryan Svihla <r...@foundev.pro> wrote: 
> I don't see what getPolicy is retrieving but you want to use TokenAware
> with the shuffle false option in the ctor, it defaults to shuffle true so
> that load is spread when people have horribly fat partitions.
> 
> On Wed, Mar 15, 2017 at 9:41 AM, Frank Hughes <frankhughes...@gmail.com>
> wrote:
> 
> > Thanks for reply. Much appreciated.
> >
> > I should have included more detail. So I am using replication factor 2,
> > and the code is using a token aware method of distributing the work so that
> > only data that is primarily owned by the node is read on that local
> > machine. So i guess this points to the logic im using to determine what is
> > primarily owned by a node. I guess this is verging into something that
> > should be posted to the java driver list, but i'll post here in case its
> > useful or theres an obvious problem:
> >
> > PoolingOptions poolingOpts = new PoolingOptions();
> > poolingOpts.setCoreConnectionsPerHost(HostDistance.REMOTE, this.coreConn);
> > poolingOpts.setMaxConnectionsPerHost(HostDistance.REMOTE, this.maxConn);
> > poolingOpts.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768);
> > poolingOpts.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
> >
> > SocketOptions socketOptions = new SocketOptions();
> > socketOptions.setReadTimeoutMillis(15000);
> >
> > Cluster.Builder builder = Cluster.builder();
> > for(String contactPoint: contactPoints){
> >     builder.addContactPoint(contactPoint.trim());
> >     builder.withPoolingOptions(poolingOpts);
> >     builder.withSocketOptions(socketOptions);
> > }
> >
> > builder.withLoadBalancingPolicy(getPolicy())
> >         .withQueryOptions(new QueryOptions()
> >                 .setPrepareOnAllHosts(true)
> >                 .setMetadataEnabled(true)
> >         );
> >
> > Cluster cluster = builder.build();
> > Metadata metadata = cluster.getMetadata();
> > Session session = cluster.connect(keyspaceName);
> > Set<Host> allHosts = metadata.getAllHosts();
> > int numberOfHost = 4;
> >
> > Host localHost = null;
> > for (Host host : allHosts) {
> >     if(host.getAddress().getHostAddress().equalsIgnoreCase(local))
> >         localHost = host;
> > }
> >
> > Map<Host, List<TokenRange>> replicaCount = new HashMap<Host,
> > List<TokenRange>>();
> > TokenRange[] tokenRanges = 
> > unwrapTokenRanges(metadata.getTokenRanges()).toArray(new
> > TokenRange[0]);
> >
> > List<TokenRange> tokenRangeList = Arrays.asList(tokenRanges);
> > tokenRangeList.sort(new Comparator<TokenRange>() {
> >     @Override
> >     public int compare(TokenRange o1, TokenRange o2) {
> >         return o1.getStart().compareTo(o2.getStart());
> >     }
> > });
> >
> > int numberOfHost = metadata.getAllHosts().size();
> > int rangesPerHost = tokenRanges.length / numberOfHost;
> >
> > for(TokenRange tokenRange : tokenRangeList){
> >
> >     Set<Host> hosts = metadata.getReplicas(keyspaceName, tokenRange);
> >
> >     String rangeHosts = "";
> >     Iterator<Host> iter = hosts.iterator();
> >     while(iter.hasNext()){
> >         Host host = iter.next();
> >
> >         List<TokenRange> tokenRangesForHost = replicaCount.get(host);
> >         if(tokenRangesForHost == null){
> >             tokenRangesForHost = new ArrayList<TokenRange>();
> >         }
> >
> >         if(tokenRangesForHost.size() < rangesPerHost || !iter.hasNext()){
> >             tokenRangesForHost.add(tokenRange);
> >             replicaCount.put(host, tokenRangesForHost);
> >             break;
> >         }
> >
> >         rangeHosts += host.getAddress().toString();
> >     }
> > }
> >
> > for(Host replica : replicaCount.keySet()){
> >     List<TokenRange> allocatedRanges = replicaCount.get(replica);
> >     for(TokenRange tr : replicaCount.get(replica)){
> >         System.out.println(tr.getStart() + " to " + tr.getEnd());
> >     }
> > }
> >
> > //get a list of token ranges for this host
> > List<TokenRange> tokenRangesForHost = replicaCount.get(localHost);
> >
> > Again, any thoughts are much appreciated.
> >
> > Thanks
> >
> > Frank
> >
> >
> > On 2017-03-15 12:38 (-0000), Ryan Svihla <r...@foundev.pro> wrote:
> > > LOCAL_ONE just means local to the datacenter by default the tokenaware
> > > policy will go to a replica that owns that data (primary or any replica
> > > depends on the driver) and that may or may not be the node the driver
> > > process is running on.
> > >
> > > So to put this more concretely if you have RF 2 with that 4 node cluster
> > so
> > > 2 nodes will be responsible for that data and if your local process is
> > not
> > > running on one of those 2 nodes it will definitely HAVE to go to another
> > > node.
> > >
> > > Therefore, if you wanted to pin behavior to a local replica you'd have to
> > > send your work out in a token aware fashion where said work only goes to
> > > the primary token owner of that data, and remove any shuffling of
> > replicas
> > > in the process (is only on by default in the java driver to my
> > knowledge).
> > >
> > > On Wed, Mar 15, 2017 at 6:38 AM, Frank Hughes <frankhughes...@gmail.com>
> > > wrote:
> > >
> > > > Hi there,
> > > >
> > > > Im running a java process on a 4 node cassandra 3.9 cluster on EC2
> > > > (instance type t2.2xlarge), the process running separately on each of
> > the
> > > > nodes (i.e. 4 running JVMs).
> > > > The process is just doing reads from Cassandra and building a SOLR
> > index
> > > > and using the java driver with consistency level LOCAL_ONE.
> > > > However, the following exception is through:
> > > >
> > > > com.datastax.driver.core.exceptions.TransportException: [/
> > 10.0.0.2:9042]
> > > > Connection has been closed
> > > >         at com.datastax.driver.core.exceptions.TransportException.
> > > > copy(TransportException.java:38)
> > > >         at com.datastax.driver.core.exceptions.TransportException.
> > > > copy(TransportException.java:24)
> > > >         at com.datastax.driver.core.DriverThrowables.propagateCause(
> > > > DriverThrowables.java:37)
> > > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > > MultiPage.prepareNextRow(ArrayBackedResultSet.java:313)
> > > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > > MultiPage.isExhausted(ArrayBackedResultSet.java:269)
> > > >         at com.datastax.driver.core.ArrayBackedResultSet$1.
> > > > hasNext(ArrayBackedResultSet.java:143)
> > > >
> > > > where 10.0.0.2 is not the local machine. So my questions:
> > > >
> > > > - Should this happen when Im using consistency level LOCAL_ONE and just
> > > > doing reads ?
> > > > - Does this suggest non-local reads are happening ?
> > > >
> > > > Many thanks for any help/ideas.
> > > >
> > > > Frank
> > > >
> > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Thanks,
> > > Ryan Svihla
> > >
> >
> 
> 
> 
> -- 
> 
> Thanks,
> Ryan Svihla
> 

Reply via email to