Thanks for reply. Much appreciated.

I should have included more detail. So I am using replication factor 2, and the 
code is using a token aware method of distributing the work so that only data 
that is primarily owned by the node is read on that local machine. So i guess 
this points to the logic im using to determine what is primarily owned by a 
node. I guess this is verging into something that should be posted to the java 
driver list, but i'll post here in case its useful or theres an obvious problem:

PoolingOptions poolingOpts = new PoolingOptions();
poolingOpts.setCoreConnectionsPerHost(HostDistance.REMOTE, this.coreConn);
poolingOpts.setMaxConnectionsPerHost(HostDistance.REMOTE, this.maxConn);
poolingOpts.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768);
poolingOpts.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);

SocketOptions socketOptions = new SocketOptions();
socketOptions.setReadTimeoutMillis(15000);

Cluster.Builder builder = Cluster.builder();
for(String contactPoint: contactPoints){
    builder.addContactPoint(contactPoint.trim());
    builder.withPoolingOptions(poolingOpts);
    builder.withSocketOptions(socketOptions);
}

builder.withLoadBalancingPolicy(getPolicy())
        .withQueryOptions(new QueryOptions()
                .setPrepareOnAllHosts(true)
                .setMetadataEnabled(true)
        );

Cluster cluster = builder.build();       
Metadata metadata = cluster.getMetadata();
Session session = cluster.connect(keyspaceName);
Set<Host> allHosts = metadata.getAllHosts();
int numberOfHost = 4;

Host localHost = null;
for (Host host : allHosts) {
    if(host.getAddress().getHostAddress().equalsIgnoreCase(local))
        localHost = host;
}

Map<Host, List<TokenRange>> replicaCount = new HashMap<Host, 
List<TokenRange>>();
TokenRange[] tokenRanges = 
unwrapTokenRanges(metadata.getTokenRanges()).toArray(new TokenRange[0]);

List<TokenRange> tokenRangeList = Arrays.asList(tokenRanges);
tokenRangeList.sort(new Comparator<TokenRange>() {
    @Override
    public int compare(TokenRange o1, TokenRange o2) {
        return o1.getStart().compareTo(o2.getStart());
    }
});

int numberOfHost = metadata.getAllHosts().size();
int rangesPerHost = tokenRanges.length / numberOfHost;

for(TokenRange tokenRange : tokenRangeList){

    Set<Host> hosts = metadata.getReplicas(keyspaceName, tokenRange);

    String rangeHosts = "";
    Iterator<Host> iter = hosts.iterator();
    while(iter.hasNext()){
        Host host = iter.next();

        List<TokenRange> tokenRangesForHost = replicaCount.get(host);
        if(tokenRangesForHost == null){
            tokenRangesForHost = new ArrayList<TokenRange>();
        }

        if(tokenRangesForHost.size() < rangesPerHost || !iter.hasNext()){
            tokenRangesForHost.add(tokenRange);
            replicaCount.put(host, tokenRangesForHost);
            break;
        }

        rangeHosts += host.getAddress().toString();
    }
}

for(Host replica : replicaCount.keySet()){
    List<TokenRange> allocatedRanges = replicaCount.get(replica);
    for(TokenRange tr : replicaCount.get(replica)){
        System.out.println(tr.getStart() + " to " + tr.getEnd());
    }
}

//get a list of token ranges for this host
List<TokenRange> tokenRangesForHost = replicaCount.get(localHost);

Again, any thoughts are much appreciated.

Thanks

Frank


On 2017-03-15 12:38 (-0000), Ryan Svihla <r...@foundev.pro> wrote: 
> LOCAL_ONE just means local to the datacenter by default the tokenaware
> policy will go to a replica that owns that data (primary or any replica
> depends on the driver) and that may or may not be the node the driver
> process is running on.
> 
> So to put this more concretely if you have RF 2 with that 4 node cluster so
> 2 nodes will be responsible for that data and if your local process is not
> running on one of those 2 nodes it will definitely HAVE to go to another
> node.
> 
> Therefore, if you wanted to pin behavior to a local replica you'd have to
> send your work out in a token aware fashion where said work only goes to
> the primary token owner of that data, and remove any shuffling of replicas
> in the process (is only on by default in the java driver to my knowledge).
> 
> On Wed, Mar 15, 2017 at 6:38 AM, Frank Hughes <frankhughes...@gmail.com>
> wrote:
> 
> > Hi there,
> >
> > Im running a java process on a 4 node cassandra 3.9 cluster on EC2
> > (instance type t2.2xlarge), the process running separately on each of the
> > nodes (i.e. 4 running JVMs).
> > The process is just doing reads from Cassandra and building a SOLR index
> > and using the java driver with consistency level LOCAL_ONE.
> > However, the following exception is through:
> >
> > com.datastax.driver.core.exceptions.TransportException: [/10.0.0.2:9042]
> > Connection has been closed
> >         at com.datastax.driver.core.exceptions.TransportException.
> > copy(TransportException.java:38)
> >         at com.datastax.driver.core.exceptions.TransportException.
> > copy(TransportException.java:24)
> >         at com.datastax.driver.core.DriverThrowables.propagateCause(
> > DriverThrowables.java:37)
> >         at com.datastax.driver.core.ArrayBackedResultSet$
> > MultiPage.prepareNextRow(ArrayBackedResultSet.java:313)
> >         at com.datastax.driver.core.ArrayBackedResultSet$
> > MultiPage.isExhausted(ArrayBackedResultSet.java:269)
> >         at com.datastax.driver.core.ArrayBackedResultSet$1.
> > hasNext(ArrayBackedResultSet.java:143)
> >
> > where 10.0.0.2 is not the local machine. So my questions:
> >
> > - Should this happen when Im using consistency level LOCAL_ONE and just
> > doing reads ?
> > - Does this suggest non-local reads are happening ?
> >
> > Many thanks for any help/ideas.
> >
> > Frank
> >
> >
> >
> 
> 
> -- 
> 
> Thanks,
> Ryan Svihla
> 

Reply via email to