Hi Reynold,
Actually, I did that a well before posting my question here.
Thanks,kant
 





On Sun, Oct 9, 2016 8:48 PM, Reynold Xin r...@databricks.com
wrote:
You should probably check with DataStax who build the Cassandra connector for
Spark.


On Sun, Oct 9, 2016 at 8:13 PM, kant kodali <kanth...@gmail.com>  wrote:

I tried SpanBy but look like there is a strange error that happening no matter
which way I try. Like the one here described for Java solution.

http://qaoverflow.com/question/how-to-use-spanby-in-java/

java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

JavaPairRDD<ByteBuffer, Iterable<CassandraRow>> cassandraRowsRDD=javaFunctions
(sc).cassandraTable("test", "hello" )
.select("col1", "col2", "col3" )
.spanBy(newFunction<CassandraRow, ByteBuffer>() {
@Override
publicByteBuffer call(CassandraRow v1) {
returnv1.getBytes("rowkey");
}
}, ByteBuffer.class);

And then here I do this here is where the problem occurs
List<Tuple2<ByteBuffer, Iterable<CassandraRow>>> listOftuples =
cassandraRowsRDD.collect(); // ERROR OCCURS HERE
Tuple2<ByteBuffer, Iterable<CassandraRow>> tuple =
listOftuples.iterator().next();
ByteBuffer partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
System.out.println(cassandraRow.getLong("col1"));
}
so I tried this  and same error
Iterable<Tuple2<ByteBuffer, Iterable<CassandraRow>>> listOftuples =
cassandraRowsRDD.collect(); // ERROR OCCURS HERE
Tuple2<ByteBuffer, Iterable<CassandraRow>> tuple =
listOftuples.iterator().next();
ByteBuffer partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
System.out.println(cassandraRow.getLong("col1"));
}
Although I understand that ByteBuffers aren't serializable I didn't get any not
serializable exception but still I went head and changed everything to byte[] so
no more ByteBuffers in the code.
I have also tried cassandraRowsRDD.collect().forEach() and
cassandraRowsRDD.stream().forEachPartition() and the same exact error occurs.
I am running everything locally and in a stand alone mode so my spark cluster is
just running on localhost.
Scala code runner version 2.11.8  // when I run scala -version or even
./spark-shell

compile group: 'org.apache.spark' name: 'spark-core_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-streaming_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-sql_2.11' version: '2.0.0'
compile group: 'com.datastax.spark' name: 'spark-cassandra-connector_2.11'
version: '2.0.0-M3':

So I don't see anything wrong with these versions.
2) I am bundling everything into one jar and so far it did worked out well
except for this error.
I am using Java 8 and Gradle.

any ideas on how I can fix this?

Reply via email to