Re: Java SPI jar reload in Spark
I'd suggest scripts like js, groovy, etc.. To my understanding the service loader mechanism isn't a good fit for runtime reloading. On Wed, Jun 7, 2017 at 4:55 PM, Jonnas Li(Contractor) < zhongshuang...@envisioncn.com> wrote: > To be more explicit, I used mapwithState() in my application, just like > this: > > stream = KafkaUtils.createStream(..) > mappedStream = stream.mapPartitionToPair(..) > stateStream = mappedStream.mapwithState(*MyUpdateFunc*(..)) > stateStream.foreachRDD(..) > > I call the jar in *MyUpdateFunc()*, and the jar reloading is triggered by > some other event. > > I'm not sure if this approach is feasible. To my understand, Spark will > checkpoint the status, so the application can’t be updated at runtime, > that’s why I got the exception. > > Any suggestion is welcome, if there is any other idea to do something like > this, I just want to provide a approach to enable users can customize for > their business logic. > > Regards > 李忠双 / Jonnas > > 发件人: Zhongshuang Li <zhongshuang...@envisioncn.com> > 日期: 2017年6月6日 星期二 下午6:30 > 至: Alonso Isidoro Roman <alons...@gmail.com> > > 抄送: Jörn Franke <jornfra...@gmail.com>, "user@spark.apache.org" < > user@spark.apache.org> > 主题: Re: Java SPI jar reload in Spark > > I used java.util.ServiceLoader > <https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html> , > as the javadoc says it supports reloading. > Please point it out if I'm mis-understanding. > > Regards > Jonnas Li > > 发件人: Alonso Isidoro Roman <alons...@gmail.com> > 日期: 2017年6月6日 星期二 下午6:21 > 至: Zhongshuang Li <zhongshuang...@envisioncn.com> > 抄送: Jörn Franke <jornfra...@gmail.com>, "user@spark.apache.org" < > user@spark.apache.org> > 主题: Re: Java SPI jar reload in Spark > > Hi, a quick search on google. > > https://github.com/spark-jobserver/spark-jobserver/issues/130 > > Alonso Isidoro Roman > [image: https://]about.me/alonso.isidoro.roman > > <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links> > > 2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) < > zhongshuang...@envisioncn.com>: > >> Thank for your quick response. >> These jars are used to define some customize business logic, and they can >> be treat as plug-ins in our business scenario. And the jars are >> developed/maintain by some third-party partners, this means there will be >> some version updating. >> My expectation is update the business code with restarting the spark >> streaming job, any suggestion will be very grateful. >> >> Regards >> Jonnas Li >> >> 发件人: Jörn Franke <jornfra...@gmail.com> >> 日期: 2017年6月6日 星期二 下午5:55 >> 至: Zhongshuang Li <zhongshuang...@envisioncn.com> >> 抄送: "user@spark.apache.org" <user@spark.apache.org> >> 主题: Re: Java SPI jar reload in Spark >> >> Why do you need jar reloading? What functionality is executed during jar >> reloading. Maybe there is another way to achieve the same without jar >> reloading. In fact, it might be dangerous from a functional point of view- >> functionality in jar changed and all your computation is wrong. >> >> On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) < >> zhongshuang...@envisioncn.com> wrote: >> >> I have a Spark Streaming application, which dynamically calling a jar >> (Java SPI), and the jar is called in a mapWithState() function, it was >> working fine for a long time. >> Recently, I got a requirement which required to reload the jar during >> runtime. >> But when the reloading is completed, the spark streaming job got failed, >> and I get the following exception, it seems the spark try to deserialize >> the checkpoint failed. >> My question is whether the logic in the jar will be serialized into >> checkpoint, and is it possible to do the jar reloading during runtime in >> Spark Streaming? >> >> >> [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, >> ip-10-21-14-205.envisioncn.com): java.lang.ClassCastException: cannot assign >> instance of scala.collection.immutable.List$SerializationProxy to field >> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type >> scala.collection.Seq in instance of >> org.apache.spark.streaming.rdd.MapWithStateRDD >> at >> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) >> at >> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) >>
Re: Java SPI jar reload in Spark
To be more explicit, I used mapwithState() in my application, just like this: stream = KafkaUtils.createStream(..) mappedStream = stream.mapPartitionToPair(..) stateStream = mappedStream.mapwithState(MyUpdateFunc(..)) stateStream.foreachRDD(..) I call the jar in MyUpdateFunc(), and the jar reloading is triggered by some other event. I'm not sure if this approach is feasible. To my understand, Spark will checkpoint the status, so the application can’t be updated at runtime, that’s why I got the exception. Any suggestion is welcome, if there is any other idea to do something like this, I just want to provide a approach to enable users can customize for their business logic. Regards 李忠双 / Jonnas 发件人: Zhongshuang Li <zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> 日期: 2017年6月6日 星期二 下午6:30 至: Alonso Isidoro Roman <alons...@gmail.com<mailto:alons...@gmail.com>> 抄送: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>, "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> 主题: Re: Java SPI jar reload in Spark I used java.util.ServiceLoader<https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html> , as the javadoc says it supports reloading. Please point it out if I'm mis-understanding. Regards Jonnas Li 发件人: Alonso Isidoro Roman <alons...@gmail.com<mailto:alons...@gmail.com>> 日期: 2017年6月6日 星期二 下午6:21 至: Zhongshuang Li <zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> 抄送: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>, "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> 主题: Re: Java SPI jar reload in Spark Hi, a quick search on google. https://github.com/spark-jobserver/spark-jobserver/issues/130 <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links> Alonso Isidoro Roman about.me/alonso.isidoro.roman 2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) <zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>: Thank for your quick response. These jars are used to define some customize business logic, and they can be treat as plug-ins in our business scenario. And the jars are developed/maintain by some third-party partners, this means there will be some version updating. My expectation is update the business code with restarting the spark streaming job, any suggestion will be very grateful. Regards Jonnas Li 发件人: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>> 日期: 2017年6月6日 星期二 下午5:55 至: Zhongshuang Li <zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> 抄送: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> 主题: Re: Java SPI jar reload in Spark Why do you need jar reloading? What functionality is executed during jar reloading. Maybe there is another way to achieve the same without jar reloading. In fact, it might be dangerous from a functional point of view- functionality in jar changed and all your computation is wrong. On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) <zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> wrote: I have a Spark Streaming application, which dynamically calling a jar (Java SPI), and the jar is called in a mapWithState() function, it was working fine for a long time. Recently, I got a requirement which required to reload the jar during runtime. But when the reloading is completed, the spark streaming job got failed, and I get the following exception, it seems the spark try to deserialize the checkpoint failed. My question is whether the logic in the jar will be serialized into checkpoint, and is it possible to do the jar reloading during runtime in Spark Streaming? [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, ip-10-21-14-205.envisioncn.com<http://ip-10-21-14-205.envisioncn.com>): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org<http://org.apache.spark.rdd.RDD.org>$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.streaming.rdd.MapWithStateRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) a
Re: Java SPI jar reload in Spark
I used java.util.ServiceLoader<https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html> , as the javadoc says it supports reloading. Please point it out if I'm mis-understanding. Regards Jonnas Li 发件人: Alonso Isidoro Roman <alons...@gmail.com<mailto:alons...@gmail.com>> 日期: 2017年6月6日 星期二 下午6:21 至: Zhongshuang Li <zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> 抄送: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>, "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> 主题: Re: Java SPI jar reload in Spark Hi, a quick search on google. https://github.com/spark-jobserver/spark-jobserver/issues/130 <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links> Alonso Isidoro Roman about.me/alonso.isidoro.roman 2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) <zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>>: Thank for your quick response. These jars are used to define some customize business logic, and they can be treat as plug-ins in our business scenario. And the jars are developed/maintain by some third-party partners, this means there will be some version updating. My expectation is update the business code with restarting the spark streaming job, any suggestion will be very grateful. Regards Jonnas Li 发件人: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>> 日期: 2017年6月6日 星期二 下午5:55 至: Zhongshuang Li <zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> 抄送: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> 主题: Re: Java SPI jar reload in Spark Why do you need jar reloading? What functionality is executed during jar reloading. Maybe there is another way to achieve the same without jar reloading. In fact, it might be dangerous from a functional point of view- functionality in jar changed and all your computation is wrong. On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) <zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> wrote: I have a Spark Streaming application, which dynamically calling a jar (Java SPI), and the jar is called in a mapWithState() function, it was working fine for a long time. Recently, I got a requirement which required to reload the jar during runtime. But when the reloading is completed, the spark streaming job got failed, and I get the following exception, it seems the spark try to deserialize the checkpoint failed. My question is whether the logic in the jar will be serialized into checkpoint, and is it possible to do the jar reloading during runtime in Spark Streaming? [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, ip-10-21-14-205.envisioncn.com<http://ip-10-21-14-205.envisioncn.com>): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org<http://org.apache.spark.rdd.RDD.org>$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.streaming.rdd.MapWithStateRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(Ob
Re: Java SPI jar reload in Spark
Hi, a quick search on google. https://github.com/spark-jobserver/spark-jobserver/issues/130 Alonso Isidoro Roman [image: https://]about.me/alonso.isidoro.roman <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links> 2017-06-06 12:14 GMT+02:00 Jonnas Li(Contractor) < zhongshuang...@envisioncn.com>: > Thank for your quick response. > These jars are used to define some customize business logic, and they can > be treat as plug-ins in our business scenario. And the jars are > developed/maintain by some third-party partners, this means there will be > some version updating. > My expectation is update the business code with restarting the spark > streaming job, any suggestion will be very grateful. > > Regards > Jonnas Li > > 发件人: Jörn Franke <jornfra...@gmail.com> > 日期: 2017年6月6日 星期二 下午5:55 > 至: Zhongshuang Li <zhongshuang...@envisioncn.com> > 抄送: "user@spark.apache.org" <user@spark.apache.org> > 主题: Re: Java SPI jar reload in Spark > > Why do you need jar reloading? What functionality is executed during jar > reloading. Maybe there is another way to achieve the same without jar > reloading. In fact, it might be dangerous from a functional point of view- > functionality in jar changed and all your computation is wrong. > > On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) < > zhongshuang...@envisioncn.com> wrote: > > I have a Spark Streaming application, which dynamically calling a jar > (Java SPI), and the jar is called in a mapWithState() function, it was > working fine for a long time. > Recently, I got a requirement which required to reload the jar during > runtime. > But when the reloading is completed, the spark streaming job got failed, > and I get the following exception, it seems the spark try to deserialize > the checkpoint failed. > My question is whether the logic in the jar will be serialized into > checkpoint, and is it possible to do the jar reloading during runtime in > Spark Streaming? > > > [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, > ip-10-21-14-205.envisioncn.com): java.lang.ClassCastException: cannot assign > instance of scala.collection.immutable.List$SerializationProxy to field > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type > scala.collection.Seq in instance of > org.apache.spark.streaming.rdd.MapWithStateRDD > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) > at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) > at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStre
Re: Java SPI jar reload in Spark
Thank for your quick response. These jars are used to define some customize business logic, and they can be treat as plug-ins in our business scenario. And the jars are developed/maintain by some third-party partners, this means there will be some version updating. My expectation is update the business code with restarting the spark streaming job, any suggestion will be very grateful. Regards Jonnas Li 发件人: Jörn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>> 日期: 2017年6月6日 星期二 下午5:55 至: Zhongshuang Li <zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> 抄送: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> 主题: Re: Java SPI jar reload in Spark Why do you need jar reloading? What functionality is executed during jar reloading. Maybe there is another way to achieve the same without jar reloading. In fact, it might be dangerous from a functional point of view- functionality in jar changed and all your computation is wrong. On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) <zhongshuang...@envisioncn.com<mailto:zhongshuang...@envisioncn.com>> wrote: I have a Spark Streaming application, which dynamically calling a jar (Java SPI), and the jar is called in a mapWithState() function, it was working fine for a long time. Recently, I got a requirement which required to reload the jar during runtime. But when the reloading is completed, the spark streaming job got failed, and I get the following exception, it seems the spark try to deserialize the checkpoint failed. My question is whether the logic in the jar will be serialized into checkpoint, and is it possible to do the jar reloading during runtime in Spark Streaming? [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, ip-10-21-14-205.envisioncn.com<http://ip-10-21-14-205.envisioncn.com>): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.streaming.rdd.MapWithStateRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) a
Re: Java SPI jar reload in Spark
Why do you need jar reloading? What functionality is executed during jar reloading. Maybe there is another way to achieve the same without jar reloading. In fact, it might be dangerous from a functional point of view- functionality in jar changed and all your computation is wrong. > On 6. Jun 2017, at 11:35, Jonnas Li(Contractor) >wrote: > > I have a Spark Streaming application, which dynamically calling a jar (Java > SPI), and the jar is called in a mapWithState() function, it was working fine > for a long time. > Recently, I got a requirement which required to reload the jar during runtime. > But when the reloading is completed, the spark streaming job got failed, and > I get the following exception, it seems the spark try to deserialize the > checkpoint failed. > My question is whether the logic in the jar will be serialized into > checkpoint, and is it possible to do the jar reloading during runtime in > Spark Streaming? > > > [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, > ip-10-21-14-205.envisioncn.com): java.lang.ClassCastException: cannot assign > instance of scala.collection.immutable.List$SerializationProxy to field > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type > scala.collection.Seq in instance of > org.apache.spark.streaming.rdd.MapWithStateRDD > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) > at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) > at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) > at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at
Java SPI jar reload in Spark
I have a Spark Streaming application, which dynamically calling a jar (Java SPI), and the jar is called in a mapWithState() function, it was working fine for a long time. Recently, I got a requirement which required to reload the jar during runtime. But when the reloading is completed, the spark streaming job got failed, and I get the following exception, it seems the spark try to deserialize the checkpoint failed. My question is whether the logic in the jar will be serialized into checkpoint, and is it possible to do the jar reloading during runtime in Spark Streaming? [2017-06-06 17:13:12,185] WARN Lost task 1.0 in stage 5355.0 (TID 4817, ip-10-21-14-205.envisioncn.com): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.streaming.rdd.MapWithStateRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at