Is this because I am calling a transformation function on an rdd from
inside another transformation function?

Is it not allowed?

Thanks
Ankut
On Oct 21, 2014 1:59 PM, "Ankur Srivastava" <ankur.srivast...@gmail.com>
wrote:

> Hi Gerard,
>
> this is the code that may be helpful.
>
> public class ReferenceDataJoin implements Serializable {
>
>
>  private static final long serialVersionUID = 1039084794799135747L;
>
> JavaPairRDD<String, Employee> rdd;
>
> CassandraJavaRDD<ReferenceData> referenceTable;
>
>
>  public PostalReferenceDataJoin(List<Employee> employees) {
>
>  JavaSparkContext sc =
> SparkContextFactory.getSparkContextFactory().getSparkContext();
>
>  this.rdd = sc.parallelizePairs(employees);
>
>  this. referenceTable = javaFunctions(sc).cassandraTable("reference_data",
>
>          “dept_reference_data", ReferenceData.class);
>
> }
>
>
>  public JavaPairRDD<String, Employee> execute() {
>
> JavaPairRDD<String, Employee> joinedRdd = rdd
>
> .mapValues(new Function<Employee, Employee>() {
>
> private static final long serialVersionUID = -226016490083377260L;
>
>
> @Override
>
> public Employee call(Employee employee)
>
> throws Exception {
>
> ReferenceData data = null;
>
> if (employee.getDepartment() != null) {
>
> data = referenceTable.where(“dept=?",
>
> employee.getDepartment()).first();;
>
> System.out.println(employee.getDepartment() + "---->" + data);
>
> }
>
> if (data != null) {
>
> //setters on employee
>
> }
>
> return employee;
>
> }
>
>   });
>
>  return joinedRdd;
>
> }
>
>
> }
>
>
> Thanks
> Ankur
>
> On Tue, Oct 21, 2014 at 11:11 AM, Gerard Maas <gerard.m...@gmail.com>
> wrote:
>
>> Looks like that code does not correspond to the problem you're facing. I
>> doubt it would even compile.
>> Could you post the actual code?
>>
>> -kr, Gerard
>> On Oct 21, 2014 7:27 PM, "Ankur Srivastava" <ankur.srivast...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am creating a cassandra java rdd and transforming it using the where
>>> clause.
>>>
>>> It works fine when I run it outside the mapValues, but when I put the
>>> code in mapValues I get an error while creating the transformation.
>>>
>>> Below is my sample code:
>>>
>>>   CassandraJavaRDD<ReferenceData> cassandraRefTable = javaFunctions(sc
>>> ).cassandraTable("reference_data",
>>>
>>>          "dept_reference_data", ReferenceData.class);
>>>
>>> JavaPairRDD<String, Employee> joinedRdd = rdd.mapValues(new
>>> Function<IPLocation, IPLocation>() {
>>>
>>>  public Employee call(Employee employee) throws Exception {
>>>
>>>      ReferenceData data = null;
>>>
>>>      if(employee.getDepartment() != null) {
>>>
>>>            data = referenceTable.where("postal_plus=?", location
>>> .getPostalPlus()).first();
>>>
>>>            System.out.println(data.toCSV());
>>>
>>>      }
>>>
>>>     if(data != null) {
>>>
>>>           //call setters on employee
>>>
>>>     }
>>>
>>>     return employee;
>>>
>>>  }
>>>
>>> }
>>>
>>> I get this error:
>>>
>>> java.lang.NullPointerException
>>>
>>> at org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
>>>
>>> at com.datastax.spark.connector.rdd.CassandraRDD.<init>(
>>> CassandraRDD.scala:47)
>>>
>>> at com.datastax.spark.connector.rdd.CassandraRDD.copy(
>>> CassandraRDD.scala:70)
>>>
>>> at com.datastax.spark.connector.rdd.CassandraRDD.where(
>>> CassandraRDD.scala:77)
>>>
>>>  at com.datastax.spark.connector.rdd.CassandraJavaRDD.where(
>>> CassandraJavaRDD.java:54)
>>>
>>>
>>> Thanks for help!!
>>>
>>>
>>>
>>> Regards
>>>
>>> Ankur
>>>
>>
>

Reply via email to