Re: How to read from Cassandra using Apache Flink?

2018-06-06 Thread HarshithBolar
I figured out a way to solve this by writing my own code, but would love to
know if there are better - more efficient solutions. Here's the answer -
https://stackoverflow.com/questions/50697296/how-to-read-from-cassandra-using-apache-flink/50721953#50721953

@Chesnay I've been wondering about this. Where exactly do you close the
connection in a Streaming job?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to read from Cassandra using Apache Flink?

2018-06-05 Thread Chesnay Schepler
You are creating an entirely new sink for each fetch, which includes 
setting up a connection to cassandra. It is not surprising that this is 
slow.
The cassandra input format was written for reading large amounts of 
data, not synchronous single row fetches.


You can try using the datastax driver (or some other library) directly. 
You should be able to re-use large portions of the CassandraInputFormat.


Note that you're also leaking the connection as you aren't closing the 
InputFormat.


On 05.06.2018 12:02, HarshithBolar wrote:

My flink program should do a Cassandra look up for each input record and
based on the results, should do some further processing.

But I'm currently stuck at reading data from Cassandra. This is the code
snippet I've come up with so far.


ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
 @Override
 protected Cluster buildCluster(Cluster.Builder builder) {
 return
builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
 .withPort(props.getCassandraPort())
 .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
 .withQueryOptions(new
QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
.build();
}
};
for (int i=1; i<5; i++) {
CassandraInputFormat>
cassandraInputFormat =
new CassandraInputFormat<>("select * from test where
id=hello" + i, >secureCassandraSinkClusterBuilder);
cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);
Tuple2 out = new Tuple8<>();
cassandraInputFormat.nextRecord(out);
System.out.println(out);
}

But the issue with this is, it takes nearly 10 seconds for each look up, in
other words, this for loop takes 50 seconds to execute.

How do I speed up this operation? Alternatively, is there any other way of
looking up Cassandra in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





How to read from Cassandra using Apache Flink?

2018-06-05 Thread HarshithBolar
My flink program should do a Cassandra look up for each input record and
based on the results, should do some further processing.

But I'm currently stuck at reading data from Cassandra. This is the code
snippet I've come up with so far.

> ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
> @Override
> protected Cluster buildCluster(Cluster.Builder builder) {
> return
> builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
> .withPort(props.getCassandraPort())
> .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
> .withQueryOptions(new 
> QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
>.build();
>}
>};

>for (int i=1; i<5; i++) {
>CassandraInputFormat>
> cassandraInputFormat =
>new CassandraInputFormat<>("select * from test where
> id=hello" + i, >secureCassandraSinkClusterBuilder);
>cassandraInputFormat.configure(null);
>cassandraInputFormat.open(null);
>Tuple2 out = new Tuple8<>();
>cassandraInputFormat.nextRecord(out);
>System.out.println(out);
>}
But the issue with this is, it takes nearly 10 seconds for each look up, in
other words, this for loop takes 50 seconds to execute.

How do I speed up this operation? Alternatively, is there any other way of
looking up Cassandra in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/