You should probably check with DataStax who build the Cassandra connector for Spark.
On Sun, Oct 9, 2016 at 8:13 PM, kant kodali <kanth...@gmail.com> wrote: > > I tried SpanBy but look like there is a strange error that happening no > matter which way I try. Like the one here described for Java solution. > > http://qaoverflow.com/question/how-to-use-spanby-in-java/ > > > *java.lang.ClassCastException: cannot assign instance of > scala.collection.immutable.List$SerializationProxy to > fieldorg.apache.spark.rdd.RDD.org > <http://org.apache.spark.rdd.rdd.org/>$apache$spark$rdd$RDD$$dependencies_ > of type scala.collection.Seq in instance of > org.apache.spark.rdd.MapPartitionsRDD* > > > JavaPairRDD<ByteBuffer, Iterable<CassandraRow>> cassandraRowsRDD= > javaFunctions(sc).cassandraTable("test", "hello" ) > .select("col1", "col2", "col3" ) > .spanBy(new Function<CassandraRow, ByteBuffer>() { > @Override > public ByteBuffer call(CassandraRow v1) { > return v1.getBytes("rowkey"); > } > }, ByteBuffer.class); > > > And then here I do this here is where the problem occurs > > List<Tuple2<ByteBuffer, Iterable<CassandraRow>>> listOftuples = > cassandraRowsRDD.collect(); // ERROR OCCURS HERE > Tuple2<ByteBuffer, Iterable<CassandraRow>> tuple = > listOftuples.iterator().next(); > ByteBuffer partitionKey = tuple._1(); > for(CassandraRow cassandraRow: tuple._2()) { > System.out.println(cassandraRow.getLong("col1")); > } > > so I tried this and same error > > Iterable<Tuple2<ByteBuffer, Iterable<CassandraRow>>> listOftuples = > cassandraRowsRDD.collect(); // ERROR OCCURS HERE > Tuple2<ByteBuffer, Iterable<CassandraRow>> tuple = > listOftuples.iterator().next(); > ByteBuffer partitionKey = tuple._1(); > for(CassandraRow cassandraRow: tuple._2()) { > System.out.println(cassandraRow.getLong("col1")); > } > > Although I understand that ByteBuffers aren't serializable I didn't get > any not serializable exception but still I went head and *changed > everything to byte[] so no more ByteBuffers in the code.* > > I have also tried cassandraRowsRDD.collect().forEach() and > cassandraRowsRDD.stream().forEachPartition() and the same exact error > occurs. > > I am running everything locally and in a stand alone mode so my spark > cluster is just running on localhost. > > Scala code runner version 2.11.8 // when I run scala -version or even > ./spark-shell > > > compile group: 'org.apache.spark' name: 'spark-core_2.11' version: '2.0.0' > compile group: 'org.apache.spark' name: 'spark-streaming_2.11' version: > '2.0.0' > compile group: 'org.apache.spark' name: 'spark-sql_2.11' version: '2.0.0' > compile group: 'com.datastax.spark' name: 'spark-cassandra-connector_2.11' > version: '2.0.0-M3': > > > So I don't see anything wrong with these versions. > > 2) I am bundling everything into one jar and so far it did worked out well > except for this error. > I am using Java 8 and Gradle. > > > any ideas on how I can fix this? >