Hi Andrea,

Do you have the error using the builder ?

PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions
    .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
    .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);


Builder builder = Cluster.builder();
builder.addContactPoint(CASSANDRA_ADDRESS);
builder.withPort(CASSANDRA_PORT);
builder.withPoolingOptions(poolingOptions);


sinkBuilderNormalStream
    .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
        + " (user, sensor, timestamp, rdf_stream, observed_value, value)"
        + " VALUES (?, ?, ?, ?, ?, ?);")
    .setClusterBuilder(builder)
    .build();


On 4 November 2017 at 19:27, Andrea Giordano <andrea.giordano....@gmail.com>
wrote:

> Hi,
> I’m using datastax driver to use Cassandra as sink for some data streams
> with Apache Flink:
> I have a problem executing my application raising an error about the full
> queue. I discovered that the default value is 256, probably too low for my
> load, so I have raised it using poolingOptions setting
> maxRequestsPerConnection as suggested here: http://docs.datastax.
> com/en/developer/java-driver/3.1/manual/pooling/.
>
> Unfortunately with the following code I obtain the following error when I
> launch it:
>
> The implementation of the ClusterBuilder is not serializable.
> The object probably contains or references non serializable fields.
>
>
> My code:
>
>
> PoolingOptions poolingOptions = new PoolingOptions();
> poolingOptions
>   .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
>   .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);
>
>
> ClusterBuilder cassandraBuilder = new ClusterBuilder() {
> private static final long serialVersionUID = 1L;
>
> @Override
> public Cluster buildCluster(Cluster.Builder builder) {
> return builder.addContactPoint(CASSANDRA_ADDRESS).withPort(CASSANDRA_PORT
> )..withPoolingOptions(poolingOptions).build();
> }
> };
>
>
> sinkBuilderNormalStream
> .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
> + " (user, sensor, timestamp, rdf_stream, observed_value, value)"
> + " VALUES (?, ?, ?, ?, ?, ?);")
> .setClusterBuilder(cassandraBuilder)
> .build();
>
>
> How can I deal with it?
>

Reply via email to