Yea so every inner class actually contains a field referencing the outer class. In your case, the anonymous class DoubleFlatMapFunction actually has a this$0 field referencing the outer class AnalyticsEngine, and thus why Spark is trying to serialize AnalyticsEngine.
In the Scala API, the closure (which is really just implemented as anonymous classes) has a field called "$outer", and Spark uses a "closure cleaner" that goes into the anonymous class to remove the $outer field if it is not used in the closure itself. In Java, the compiler generates a field called "this$0", and thus the closure cleaner doesn't find it and can't "clean" it properly. I will work on a fix for the closure cleaner to clean this up as well. Meantime, you can work around this by either defining your anonymous class as a static class, or mark the JavaSparkContext field as transient. On Sun, Nov 3, 2013 at 7:32 PM, Patrick Wendell <pwend...@gmail.com> wrote: > Hm, I think you are triggering a bug in the Java API where closures > may not be properly cleaned. I think @rxin has reproduced this, > deferring to him. > > - Patrick > > On Sun, Nov 3, 2013 at 5:25 PM, Yadid Ayzenberg <ya...@media.mit.edu> > wrote: > > code is below. in the code rdd.count() works, but rdd2.count() fails. > > > > public class AnalyticsEngine implements Serializable { > > > > private static AnalyticsEngine engine=null; > > private JavaSparkContext sc; > > > > final Logger logger = LoggerFactory.getLogger(AnalyticsEngine.class); > > private Properties prop; > > > > String db_host; > > > > private AnalyticsEngine() > > { > > System.setProperty("spark.serializer", > > "org.apache.spark.serializer.KryoSerializer"); > > System.setProperty("spark.kryo.registrator", > > "edu.mit.bsense.MyRegistrator"); > > sc = new JavaSparkContext("local[4]","TestSpark"); > > Properties prop = new Properties(); > > try { > > prop.load(new FileInputStream("config.properties")); > > > > > > db_host = prop.getProperty("database_host1"); > > logger.info("Database host: {}", db_host); > > } catch (FileNotFoundException ex) > > { > > logger.info("Could not read config.properties: " + > > ex.toString()); > > > > } catch (IOException ex) > > { > > logger.info("Could not read config.properties: " + > > ex.toString()); > > > > } > > > > > > > > public void getData(void) > > { > > Configuration conf = new Configuration(); > > > > String conf_url = "mongodb://" + db_host + "/test.data1"; //this > is > > the data partition > > conf.set("mongo.input.uri", conf_url); > > > > > > conf.set("mongo.input.query", > > "{\"streamId\":\""+"13"+"\"},{\"data\":1}"); > > conf.set("mongo.input.split_size","64"); > > > > JavaPairRDD<Object,BSONObject> rdd = sc.newAPIHadoopRDD(conf, > > MongoInputFormat.class, Object.class, BSONObject.class); > > > > rdd.cache(); > > > > logger.info("Count of rdd: {}", rdd.count()); > > > > logger.info > ("=========================================================================="); > > > > > > > > JavaDoubleRDD rdd2 = rdd.flatMap( new > > DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() { > > > > @Override > > public Iterable<Double> call(Tuple2<Object, BSONObject> e) { > > BSONObject doc = e._2(); > > BasicDBList vals = (BasicDBList)doc.get("data"); > > > > List<Double> results = new ArrayList<Double>(); > > for (int i=0; i< vals.size();i++ ) > > results.add((Double)((BasicDBList)vals.get(i)).get(0)); > > > > return results; > > > > } > > }); > > > > logger.info("Take: {}", rdd2.take(100)); > > logger.info("Count: {}", rdd2.count()); > > > > > > } > > > > } > > > > > > On 11/3/13 8:19 PM, Patrick Wendell wrote: > >> > >> Thanks that would help. This would be consistent with there being a > >> reference to the SparkContext itself inside of the closure. Just want > >> to make sure that's not the case. > >> > >> On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <ya...@media.mit.edu> > >> wrote: > >>> > >>> Im running in local[4] mode - so there are no slave machines. Full > stack > >>> trace: > >>> > >>> > >>> (run-main) org.apache.spark.SparkException: Job failed: > >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine > >>> org.apache.spark.SparkException: Job failed: > >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine > >>> at > >>> > >>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) > >>> at > >>> > >>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) > >>> at > >>> > >>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > >>> at > >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >>> at > >>> > >>> > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) > >>> at > >>> > >>> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) > >>> at > >>> > >>> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) > >>> at > >>> > >>> > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) > >>> at > >>> > >>> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) > >>> at > >>> > >>> > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) > >>> [debug] Thread run-main exited. > >>> [debug] Interrupting remaining threads (should be all daemons). > >>> [debug] Sandboxed run complete.. > >>> java.lang.RuntimeException: Nonzero exit code: 1 > >>> at scala.sys.package$.error(package.scala:27) > >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) > >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) > >>> at scala.Option.foreach(Option.scala:236) > >>> at sbt.BuildCommon$class.toError(Defaults.scala:1628) > >>> at sbt.Defaults$.toError(Defaults.scala:34) > >>> at > >>> > >>> > sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647) > >>> at > >>> > >>> > sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645) > >>> at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) > >>> at > >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42) > >>> at sbt.std.Transform$$anon$4.work(System.scala:64) > >>> at > >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) > >>> at > >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) > >>> at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18) > >>> at sbt.Execute.work(Execute.scala:244) > >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) > >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) > >>> at > >>> > >>> > sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160) > >>> at sbt.CompletionService$$anon$2.call(CompletionService.scala:30) > >>> at > >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) > >>> at > >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) > >>> at > >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) > >>> at > >>> > >>> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) > >>> at > >>> > >>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) > >>> at java.lang.Thread.run(Thread.java:695) > >>> > >>> when I add implements Serializable to my class, I get the following > stack > >>> trace: > >>> > >>> error] (run-main) org.apache.spark.SparkException: Job failed: > >>> java.io.NotSerializableException: > >>> org.apache.spark.api.java.JavaSparkContext > >>> org.apache.spark.SparkException: Job failed: > >>> java.io.NotSerializableException: > >>> org.apache.spark.api.java.JavaSparkContext > >>> > >>> at > >>> > >>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) > >>> at > >>> > >>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) > >>> at > >>> > >>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > >>> at > >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >>> at > >>> > >>> > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) > >>> at > >>> > >>> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) > >>> at > >>> > >>> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) > >>> at > >>> > >>> > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) > >>> at > >>> > >>> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) > >>> at > >>> > >>> > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) > >>> [debug] Thread run-main exited. > >>> [debug] Interrupting remaining threads (should be all daemons). > >>> [debug] Sandboxed run complete.. > >>> java.lang.RuntimeException: Nonzero exit code: 1 > >>> at scala.sys.package$.error(package.scala:27) > >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) > >>> at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628) > >>> at scala.Option.foreach(Option.scala:236) > >>> at sbt.BuildCommon$class.toError(Defaults.scala:1628) > >>> at sbt.Defaults$.toError(Defaults.scala:34) > >>> at > >>> > >>> > sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647) > >>> at > >>> > >>> > sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645) > >>> at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47) > >>> at > >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42) > >>> at sbt.std.Transform$$anon$4.work(System.scala:64) > >>> at > >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) > >>> at > >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237) > >>> at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18) > >>> at sbt.Execute.work(Execute.scala:244) > >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) > >>> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237) > >>> at > >>> > >>> > sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160) > >>> at sbt.CompletionService$$anon$2.call(CompletionService.scala:30) > >>> at > >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) > >>> at > >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) > >>> at > >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > >>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) > >>> at > >>> > >>> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) > >>> at > >>> > >>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) > >>> at java.lang.Thread.run(Thread.java:695) > >>> > >>> I can post my code if that helps > >>> > >>> > >>> > >>> On 11/3/13 8:05 PM, Patrick Wendell wrote: > >>>> > >>>> If you look in the UI, are there failures on any of the slaves that > >>>> you can give a stack trace for? That would narrow down where the > >>>> serialization error is happening. > >>>> > >>>> Unfortunately this code path doesn't print a full stack trace which > >>>> makes it harder to debug where the serialization error comes from. > >>>> > >>>> Could you post all of your code? > >>>> > >>>> Also, just wondering, what happens if you just go ahead and add > >>>> "extends Serializable" to AnalyticsEngine class? It's possible this is > >>>> happening during closure serialization, which will use the closure > >>>> serializer (which is by default Java). > >>>> > >>>> - Patrick > >>>> > >>>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <ya...@media.mit.edu> > >>>> wrote: > >>>>> > >>>>> yes, I tried that as well (it is currently registered with Kryo)- > >>>>> although > >>>>> it doesnt make sense to me (and doesnt solve the problem). I also > made > >>>>> sure > >>>>> my registration was running: > >>>>> DEBUG org.apache.spark.serializer.KryoSerializer - Running user > >>>>> registrator: edu.mit.bsense.MyRegistrator > >>>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG > >>>>> org.apache.spark.serializer.KryoSerializer - Running user > registrator: > >>>>> edu.mit.bsense.MyRegistrator > >>>>> > >>>>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which > >>>>> instantiates the RDDs and runs the map() and count(). > >>>>> Can you explain why it needs to be serialized? > >>>>> > >>>>> Also, when running count() on my original RDD (pre map) I get the > right > >>>>> answer - this means the classes of data in the RDD are serializable. > >>>>> It's only when I run map, and then count() on a new RDD do I get this > >>>>> exception. My map does not introduce any new classes it - just > iterates > >>>>> over > >>>>> the existing data. > >>>>> > >>>>> Any ideas? > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On 11/3/13 7:43 PM, Patrick Wendell wrote: > >>>>>> > >>>>>> edu.mit.bsense.AnalyticsEngine > >>>>>> > >>>>>> Look at the exception. Basically, you'll need to register every > class > >>>>>> type that is recursively used by BSONObject. > >>>>>> > >>>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg < > ya...@media.mit.edu> > >>>>>> wrote: > >>>>>>> > >>>>>>> Hi Patrick, > >>>>>>> > >>>>>>> I am in fact using Kryo and im registering BSONObject.class (which > >>>>>>> is > >>>>>>> class > >>>>>>> holding the data) in my KryoRegistrator. > >>>>>>> Im not sure what other classes I should be registering. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> Yadid > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote: > >>>>>>>> > >>>>>>>> The problem is you are referencing a class that does not "extend > >>>>>>>> serializable" in the data that you shuffle. Spark needs to send > all > >>>>>>>> shuffle data over the network, so it needs to know how to > serialize > >>>>>>>> them. > >>>>>>>> > >>>>>>>> One option is to use Kryo for network serialization as described > >>>>>>>> here > >>>>>>>> - you'll have to register all the class that get serialized > though. > >>>>>>>> > >>>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html > >>>>>>>> > >>>>>>>> Another option is to write a wrapper class that "extends > >>>>>>>> externalizable" and write the serialization yourself. > >>>>>>>> > >>>>>>>> - Patrick > >>>>>>>> > >>>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg > >>>>>>>> <ya...@media.mit.edu> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Hi All, > >>>>>>>>> > >>>>>>>>> My original RDD contains arrays of doubles. when appying a > count() > >>>>>>>>> operator > >>>>>>>>> to the original RDD I get the result as expected. > >>>>>>>>> However when I run a map on the original RDD in order to > generate a > >>>>>>>>> new > >>>>>>>>> RDD > >>>>>>>>> with only the first element of each array, and try to apply > count() > >>>>>>>>> to > >>>>>>>>> the > >>>>>>>>> new generated RDD I get the following exception: > >>>>>>>>> > >>>>>>>>> 19829 [run-main] INFO org.apache.spark.scheduler.DAGScheduler - > >>>>>>>>> Failed > >>>>>>>>> to > >>>>>>>>> run count at AnalyticsEngine.java:133 > >>>>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed: > >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine > >>>>>>>>> org.apache.spark.SparkException: Job failed: > >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > >>>>>>>>> at > >>>>>>>>> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) > >>>>>>>>> at > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> If a run a take() operation on the new RDD I receive the results > as > >>>>>>>>> expected. here is my code: > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> JavaRDD<Double> rdd2 = rdd.flatMap( new > >>>>>>>>> FlatMapFunction<Tuple2<Object, > >>>>>>>>> BSONObject>, Double>() { > >>>>>>>>> @Override > >>>>>>>>> public Iterable<Double> call(Tuple2<Object, > BSONObject> > >>>>>>>>> e) > >>>>>>>>> { > >>>>>>>>> BSONObject doc = e._2(); > >>>>>>>>> List<List<Double>> vals = > >>>>>>>>> (List<List<Double>>)doc.get("data"); > >>>>>>>>> List<Double> results = new ArrayList<Double>(); > >>>>>>>>> for (int i=0; i< vals.size();i++ ) > >>>>>>>>> results.add((Double)vals.get(i).get(0)); > >>>>>>>>> return results; > >>>>>>>>> > >>>>>>>>> } > >>>>>>>>> }); > >>>>>>>>> > >>>>>>>>> logger.info("Take: {}", rdd2.take(100)); > >>>>>>>>> logger.info("Count: {}", rdd2.count()); > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Any ideas on what I am doing wrong ? > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> > >>>>>>>>> Yadid > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Yadid Ayzenberg > >>>>>>>>> Graduate Student and Research Assistant > >>>>>>>>> Affective Computing > >>>>>>>>> Phone: 617-866-7226 > >>>>>>>>> Room: E14-274G > >>>>>>>>> MIT Media Lab > >>>>>>>>> 75 Amherst st, Cambridge, MA, 02139 > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> -- > >>>>>>> Yadid Ayzenberg > >>>>>>> Graduate Student and Research Assistant > >>>>>>> Affective Computing > >>>>>>> Phone: 617-866-7226 > >>>>>>> Room: E14-274G > >>>>>>> MIT Media Lab > >>>>>>> 75 Amherst st, Cambridge, MA, 02139 > >>>>>>> > >>>>>>> > >>>>>>> > >>>>> -- > >>>>> Yadid Ayzenberg > >>>>> Graduate Student and Research Assistant > >>>>> Affective Computing > >>>>> Phone: 617-866-7226 > >>>>> Room: E14-274G > >>>>> MIT Media Lab > >>>>> 75 Amherst st, Cambridge, MA, 02139 > >>>>> > >>>>> > >>>>> > >>> > >>> -- > >>> Yadid Ayzenberg > >>> Graduate Student and Research Assistant > >>> Affective Computing > >>> Phone: 617-866-7226 > >>> Room: E14-274G > >>> MIT Media Lab > >>> 75 Amherst st, Cambridge, MA, 02139 > >>> > >>> > >>> > > > > > > -- > > Yadid Ayzenberg > > Graduate Student and Research Assistant > > Affective Computing > > Phone: 617-866-7226 > > Room: E14-274G > > MIT Media Lab > > 75 Amherst st, Cambridge, MA, 02139 > > > > > > >