Today is my day... Trying to go thru where I can pitch in. Let me know if below 
makes sense.

I looked at joda Java Api source code (1.2.9) and traced that call in NPE. It 
looks like AssembledChronology class is being used, the iYears instance 
variable is defined as transient.

DateTime.minusYears(int years) call trace:
long instant = getChronology().years().subtract(getMillis(), years);

Not sure how the suggested serializer would help if variable is transient.

Thanks,
-Durgesh

> On Jan 14, 2016, at 11:49 AM, Spencer, Alex (Santander) 
> <alex.spen...@santander.co.uk.INVALID> wrote:
> 
> I appreciate this – thank you.
>  
> I’m not an admin on the box I’m using spark-shell on – so I’m not sure I can 
> add them to that namespace. I’m hoping if I declare the 
> JodaDateTimeSerializer class in my REPL that I can still get this to work. I 
> think the INTERVAL part below may be key, I haven’t tried that yet.
>  
> Kind Regards,
> Alex.
>  
> From: Todd Nist [mailto:tsind...@gmail.com] 
> Sent: 14 January 2016 16:28
> To: Spencer, Alex (Santander)
> Cc: Sean Owen; user@spark.apache.org
> Subject: Re: NPE when using Joda DateTime
>  
> I had a similar problem a while back and leveraged these Kryo serializers, 
> https://github.com/magro/kryo-serializers.  I had to fallback to version 
> 0.28, but that was a while back.  You can add these to the 
> org.apache.spark.serializer.KryoRegistrator
> and then set your registrator in the spark config:
> 
> sparkConfig.
>     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>     .set("spark.kryo.registrator", "com.yourpackage.YourKryoRegistrator")
>     ...
> 
> where YourKryoRegistrator is something like:
> 
> class YourKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.register(classOf[org.joda.time.DateTime], new JodaDateTimeSerializer)
>     kryo.register(classOf[org.joda.time.Interval], new JodaIntervalSerializer)
>   }
> }
> HTH.
> -Todd
>  
> On Thu, Jan 14, 2016 at 9:28 AM, Spencer, Alex (Santander) 
> <alex.spen...@santander.co.uk.invalid> wrote:
> Hi,
> 
> I tried take(1500) and test.collect and these both work on the "single" map 
> statement.
> 
> I'm very new to Kryo serialisation, I managed to find some code and I copied 
> and pasted and that's what originally made the single map statement work:
> 
> class MyRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>         kryo.register(classOf[org.joda.time.DateTime])
>   }
> }
> 
> Is it because the groupBy sees a different class type? Maybe Array[DateTime]? 
> I don’t want to find the answer by trial and error though.
> 
> Alex
> 
> -----Original Message-----
> From: Sean Owen [mailto:so...@cloudera.com]
> Sent: 14 January 2016 14:07
> To: Spencer, Alex (Santander)
> Cc: user@spark.apache.org
> Subject: Re: NPE when using Joda DateTime
> 
> It does look somehow like the state of the DateTime object isn't being 
> recreated properly on deserialization somehow, given where the NPE occurs 
> (look at the Joda source code). However the object is java.io.Serializable. 
> Are you sure the Kryo serialization is correct?
> 
> It doesn't quite explain why the map operation works by itself. It could be 
> the difference between executing locally (take(1) will look at 1 partition in 
> 1 task which prefers to be local) and executing remotely (groupBy is going to 
> need a shuffle).
> 
> On Thu, Jan 14, 2016 at 1:01 PM, Spencer, Alex (Santander) 
> <alex.spen...@santander.co.uk.invalid> wrote:
> > Hello,
> >
> >
> >
> > I was wondering if somebody is able to help me get to the bottom of a
> > null pointer exception I’m seeing in my code. I’ve managed to narrow
> > down a problem in a larger class to my use of Joda’s DateTime
> > functions. I’ve successfully run my code in scala, but I’ve hit a few
> > problems when adapting it to run in spark.
> >
> >
> >
> > Spark version: 1.3.0
> >
> > Scala version: 2.10.4
> >
> > Java HotSpot 1.7
> >
> >
> >
> > I have a small case class called Transaction, which looks something
> > like
> > this:
> >
> >
> >
> > case class Transaction(date : org.joda.time.DateTime = new
> > org.joda.time.DateTime())
> >
> >
> >
> > I have an RDD[Transactions] trans:
> >
> > org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[4] at map at
> > <console>:44
> >
> >
> >
> > I am able to run this successfully:
> >
> >
> >
> > val test = trans.map(_.date.minusYears(10))
> >
> > test.take(1)
> >
> >
> >
> > However if I do:
> >
> >
> >
> > val groupedTrans = trans.groupBy(_.account)
> >
> >
> >
> > //For each group, process transactions in turn:
> >
> > val test = groupedTrans.flatMap { case (_, transList) =>
> >
> >   transList.map {transaction =>
> >
> >     transaction.date.minusYears(10)
> >
> >   }
> >
> > }
> >
> > test.take(1)
> >
> >
> >
> > I get:
> >
> >
> >
> > java.lang.NullPointerException
> >
> >         at org.joda.time.DateTime.minusYears(DateTime.java:1268)
> >
> >
> >
> > Should the second operation not be equivalent to the first .map one?
> > (It’s a long way round of producing my error – but it’s extremely
> > similar to what’s happening in my class).
> >
> >
> >
> > I’ve got a custom registration class for Kryo which I think is working
> > - before I added this the original .map did not work – but shouldn’t
> > it be able to serialize all instances of Joda DateTime?
> >
> >
> >
> > Thank you for any help / pointers you can give me.
> >
> >
> >
> > Kind Regards,
> >
> > Alex.
> >
> >
> >
> > Alex Spencer
> >
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
> commands, e-mail: user-h...@spark.apache.org
> 
> Emails aren't always secure, and they may be intercepted or changed after
> they've been sent. Santander doesn't accept liability if this happens. If you
> think someone may have interfered with this email, please get in touch with 
> the
> sender another way. This message doesn't create or change any contract.
> Santander doesn't accept responsibility for damage caused by any viruses
> contained in this email or its attachments. Emails may be monitored. If you've
> received this email by mistake, please let the sender know at once that it's
> gone to the wrong person and then destroy it without copying, using, or 
> telling
> anyone about its contents.
> Santander UK plc Reg. No. 2294747 and Abbey National Treasury Services plc 
> Reg.
> No. 2338548 Registered Offices: 2 Triton Square, Regent's Place, London NW1 
> 3AN.
> Registered in England. www.santander.co.uk. Authorised by the Prudential
> Regulation Authority and regulated by the Financial Conduct Authority and the
> Prudential Regulation Authority. FCA Reg. No. 106054 and 146003 respectively.
> Santander Sharedealing is a trading name of Abbey Stockbrokers Limited Reg. 
> No.
> 02666793. Registered Office: Kingfisher House, Radford Way, Billericay, Essex
> CM12 0GZ. Authorised and regulated by the Financial Conduct Authority. FCA 
> Reg.
> No. 154210. You can check this on the Financial Services Register by visiting
> the FCA’s website www.fca.org.uk/register or by contacting the FCA on 0800 111
> 6768. Santander UK plc is also licensed by the Financial Supervision 
> Commission
> of the Isle of Man for its branch in the Isle of Man. Deposits held with the
> Isle of Man branch are covered by the Isle of Man Depositors’ Compensation
> Scheme as set out in the Isle of Man Depositors’ Compensation Scheme 
> Regulations
> 2010. In the Isle of Man, Santander UK plc’s principal place of business is at
> 19/21 Prospect Hill, Douglas, Isle of Man, IM1 1ET. Santander and the flame 
> logo
> are registered trademarks.
> Santander Asset Finance plc. Reg. No. 1533123. Registered Office: 2 Triton
> Square, Regent’s Place, London NW1 3AN. Registered in England. Santander
> Corporate & Commercial is a brand name used by Santander UK plc, Abbey 
> National
> Treasury Services plc and Santander Asset Finance plc.
> Ref:[PDB#1-4A]
>  

Reply via email to