Without pooling options I have no errors and all works correctly (with a light throughput ).
Trying to raise it, flink gave me a pool busy error about Cassandra So I used pooling options. Now when I start the program I have the problem described here El 6 nov. 2017 9:48, "Nicolas Guyomar" <nicolas.guyo...@gmail.com> escribió: > Hi Andrea, > > Do you have the error using the builder ? > > PoolingOptions poolingOptions = new PoolingOptions(); > poolingOptions > .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768) > .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000); > > > Builder builder = Cluster.builder(); > builder.addContactPoint(CASSANDRA_ADDRESS); > builder.withPort(CASSANDRA_PORT); > builder.withPoolingOptions(poolingOptions); > > > sinkBuilderNormalStream > .setQuery("INSERT INTO keyspace_local.values_by_sensors_users" > + " (user, sensor, timestamp, rdf_stream, observed_value, value)" > + " VALUES (?, ?, ?, ?, ?, ?);") > .setClusterBuilder(builder) > .build(); > > > On 4 November 2017 at 19:27, Andrea Giordano < > andrea.giordano....@gmail.com> wrote: > >> Hi, >> I’m using datastax driver to use Cassandra as sink for some data streams >> with Apache Flink: >> I have a problem executing my application raising an error about the full >> queue. I discovered that the default value is 256, probably too low for my >> load, so I have raised it using poolingOptions setting >> maxRequestsPerConnection as suggested here: http://docs.datastax.com >> /en/developer/java-driver/3.1/manual/pooling/. >> >> Unfortunately with the following code I obtain the following error when I >> launch it: >> >> The implementation of the ClusterBuilder is not serializable. >> The object probably contains or references non serializable fields. >> >> >> My code: >> >> >> PoolingOptions poolingOptions = new PoolingOptions(); >> poolingOptions >> .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768) >> .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000); >> >> >> ClusterBuilder cassandraBuilder = new ClusterBuilder() { >> private static final long serialVersionUID = 1L; >> >> @Override >> public Cluster buildCluster(Cluster.Builder builder) { >> return builder.addContactPoint(CASSANDRA_ADDRESS).withPort(CASSANDRA_PORT >> )..withPoolingOptions(poolingOptions).build(); >> } >> }; >> >> >> sinkBuilderNormalStream >> .setQuery("INSERT INTO keyspace_local.values_by_sensors_users" >> + " (user, sensor, timestamp, rdf_stream, observed_value, value)" >> + " VALUES (?, ?, ?, ?, ?, ?);") >> .setClusterBuilder(cassandraBuilder) >> .build(); >> >> >> How can I deal with it? >> > >