Without pooling options I have no errors and all works correctly (with a
light throughput ).

Trying to raise it, flink gave me a pool busy error about Cassandra So I
used pooling options. Now when I start the program I have the problem
described here
El 6 nov. 2017 9:48, "Nicolas Guyomar" <nicolas.guyo...@gmail.com> escribió:

> Hi Andrea,
>
> Do you have the error using the builder ?
>
> PoolingOptions poolingOptions = new PoolingOptions();
> poolingOptions
>     .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
>     .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);
>
>
> Builder builder = Cluster.builder();
> builder.addContactPoint(CASSANDRA_ADDRESS);
> builder.withPort(CASSANDRA_PORT);
> builder.withPoolingOptions(poolingOptions);
>
>
> sinkBuilderNormalStream
>     .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
>         + " (user, sensor, timestamp, rdf_stream, observed_value, value)"
>         + " VALUES (?, ?, ?, ?, ?, ?);")
>     .setClusterBuilder(builder)
>     .build();
>
>
> On 4 November 2017 at 19:27, Andrea Giordano <
> andrea.giordano....@gmail.com> wrote:
>
>> Hi,
>> I’m using datastax driver to use Cassandra as sink for some data streams
>> with Apache Flink:
>> I have a problem executing my application raising an error about the full
>> queue. I discovered that the default value is 256, probably too low for my
>> load, so I have raised it using poolingOptions setting
>> maxRequestsPerConnection as suggested here: http://docs.datastax.com
>> /en/developer/java-driver/3.1/manual/pooling/.
>>
>> Unfortunately with the following code I obtain the following error when I
>> launch it:
>>
>> The implementation of the ClusterBuilder is not serializable.
>> The object probably contains or references non serializable fields.
>>
>>
>> My code:
>>
>>
>> PoolingOptions poolingOptions = new PoolingOptions();
>> poolingOptions
>>   .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
>>   .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);
>>
>>
>> ClusterBuilder cassandraBuilder = new ClusterBuilder() {
>> private static final long serialVersionUID = 1L;
>>
>> @Override
>> public Cluster buildCluster(Cluster.Builder builder) {
>> return builder.addContactPoint(CASSANDRA_ADDRESS).withPort(CASSANDRA_PORT
>> )..withPoolingOptions(poolingOptions).build();
>> }
>> };
>>
>>
>> sinkBuilderNormalStream
>> .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
>> + " (user, sensor, timestamp, rdf_stream, observed_value, value)"
>> + " VALUES (?, ?, ?, ?, ?, ?);")
>> .setClusterBuilder(cassandraBuilder)
>> .build();
>>
>>
>> How can I deal with it?
>>
>
>

Reply via email to