Is this because I am calling a transformation function on an rdd from inside another transformation function?
Is it not allowed? Thanks Ankut On Oct 21, 2014 1:59 PM, "Ankur Srivastava" <ankur.srivast...@gmail.com> wrote: > Hi Gerard, > > this is the code that may be helpful. > > public class ReferenceDataJoin implements Serializable { > > > private static final long serialVersionUID = 1039084794799135747L; > > JavaPairRDD<String, Employee> rdd; > > CassandraJavaRDD<ReferenceData> referenceTable; > > > public PostalReferenceDataJoin(List<Employee> employees) { > > JavaSparkContext sc = > SparkContextFactory.getSparkContextFactory().getSparkContext(); > > this.rdd = sc.parallelizePairs(employees); > > this. referenceTable = javaFunctions(sc).cassandraTable("reference_data", > > “dept_reference_data", ReferenceData.class); > > } > > > public JavaPairRDD<String, Employee> execute() { > > JavaPairRDD<String, Employee> joinedRdd = rdd > > .mapValues(new Function<Employee, Employee>() { > > private static final long serialVersionUID = -226016490083377260L; > > > @Override > > public Employee call(Employee employee) > > throws Exception { > > ReferenceData data = null; > > if (employee.getDepartment() != null) { > > data = referenceTable.where(“dept=?", > > employee.getDepartment()).first();; > > System.out.println(employee.getDepartment() + "---->" + data); > > } > > if (data != null) { > > //setters on employee > > } > > return employee; > > } > > }); > > return joinedRdd; > > } > > > } > > > Thanks > Ankur > > On Tue, Oct 21, 2014 at 11:11 AM, Gerard Maas <gerard.m...@gmail.com> > wrote: > >> Looks like that code does not correspond to the problem you're facing. I >> doubt it would even compile. >> Could you post the actual code? >> >> -kr, Gerard >> On Oct 21, 2014 7:27 PM, "Ankur Srivastava" <ankur.srivast...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I am creating a cassandra java rdd and transforming it using the where >>> clause. >>> >>> It works fine when I run it outside the mapValues, but when I put the >>> code in mapValues I get an error while creating the transformation. >>> >>> Below is my sample code: >>> >>> CassandraJavaRDD<ReferenceData> cassandraRefTable = javaFunctions(sc >>> ).cassandraTable("reference_data", >>> >>> "dept_reference_data", ReferenceData.class); >>> >>> JavaPairRDD<String, Employee> joinedRdd = rdd.mapValues(new >>> Function<IPLocation, IPLocation>() { >>> >>> public Employee call(Employee employee) throws Exception { >>> >>> ReferenceData data = null; >>> >>> if(employee.getDepartment() != null) { >>> >>> data = referenceTable.where("postal_plus=?", location >>> .getPostalPlus()).first(); >>> >>> System.out.println(data.toCSV()); >>> >>> } >>> >>> if(data != null) { >>> >>> //call setters on employee >>> >>> } >>> >>> return employee; >>> >>> } >>> >>> } >>> >>> I get this error: >>> >>> java.lang.NullPointerException >>> >>> at org.apache.spark.rdd.RDD.<init>(RDD.scala:125) >>> >>> at com.datastax.spark.connector.rdd.CassandraRDD.<init>( >>> CassandraRDD.scala:47) >>> >>> at com.datastax.spark.connector.rdd.CassandraRDD.copy( >>> CassandraRDD.scala:70) >>> >>> at com.datastax.spark.connector.rdd.CassandraRDD.where( >>> CassandraRDD.scala:77) >>> >>> at com.datastax.spark.connector.rdd.CassandraJavaRDD.where( >>> CassandraJavaRDD.java:54) >>> >>> >>> Thanks for help!! >>> >>> >>> >>> Regards >>> >>> Ankur >>> >> >