[SQL] Why does a small two-source JDBC query take ~150-200ms with all optimizations (AQE, CBO, pushdown, Kryo, unsafe) enabled? (v3.4.0-SNAPSHOT)
I did some basic testing of multi-source queries with the most recent Spark: https://github.com/GavinRay97/spark-playground/blob/44a756acaee676a9b0c128466e4ab231a7df8d46/src/main/scala/Application.scala#L46-L115 The output of "spark.time()" surprised me: SELECT p.id, p.name, t.id, t.title FROM db1.public.person p JOIN db2.public.todos t ON p.id = t.person_id WHERE p.id = 1 +---++---+--+ | id|name| id| title| +---++---+--+ | 1| Bob| 1|Todo 1| | 1| Bob| 2|Todo 2| +---++---+--+ Time taken: 168 ms SELECT p.id, p.name, t.id, t.title FROM db1.public.person p JOIN db2.public.todos t ON p.id = t.person_id WHERE p.id = 2 LIMIT 1 +---+-+---+--+ | id| name| id| title| +---+-+---+--+ | 2|Alice| 3|Todo 3| +---+-+---+--+ Time taken: 228 ms Calcite and Teiid manage to do this on the order of 5-50ms for basic queries, so I'm curious about the technical specifics on why Spark appears to be so much slower here?
Supporting Kryo registration in DSv2
Hello all, Is there a way to register classes within a datasourcev2 implementation in the Kryo serializer? I've attempted the following in both the constructor and static block of my toplevel class: SparkContext context = SparkContext.getOrCreate(); SparkConf conf = context.getConf(); Class[] classesRegistered = new Class[] { edu.vanderbilt.accre.laurelin.spark_ttree.Reader.class, edu.vanderbilt.accre.laurelin.spark_ttree.Partition.class, edu.vanderbilt.accre.laurelin.spark_ttree.SlimTBranch.class }; conf.registerKryoClasses(classesRegistered); But (if I'm reading correctly), this is too late, since the config has already been parsed while initializing the SparkContext, adding classes to the SparkConf has no effect. From what I can tell, the kryo instance behind is private, so I can't add the registration manually either. Any thoughts? Thanks Andrew
Re: intermittent Kryo serialization failures in Spark
Hi Julien, Thanks for the suggestion. If we don't do a broadcast, that would presumably affect the performance of the job, as the model that is failing to be broadcast is something that we need to be shared across the cluster. But it may be worth it if the trade-off is not having things run properly. Vadim's suggestions did not make a difference for me (still hitting this error several times a day) but I'll try with disabling broadcast and see if that does anything. thanks, Jerry On Fri, Sep 20, 2019 at 10:00 AM Julien Laurenceau < julien.laurenc...@pepitedata.com> wrote: > Hi, > Did you try without the broadcast ? > Regards > JL > > Le jeu. 19 sept. 2019 à 06:41, Vadim Semenov > a écrit : > >> Pre-register your classes: >> >> ``` >> import com.esotericsoftware.kryo.Kryo >> import org.apache.spark.serializer.KryoRegistrator >> >> class MyKryoRegistrator extends KryoRegistrator { >> override def registerClasses(kryo: Kryo): Unit = { >> kryo.register(Class.forName("[[B")) // byte[][] >> kryo.register(classOf[java.lang.Class[_]]) >> } >> } >> ``` >> >> then run with >> >> 'spark.kryo.referenceTracking': 'false', >> 'spark.kryo.registrationRequired': 'false', >> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator', >> 'spark.kryo.unsafe': 'false', >> 'spark.kryoserializer.buffer.max': '256m', >> >> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov >> wrote: >> >>> Hi folks, >>> >>> Posted this some time ago but the problem continues to bedevil us. I'm >>> including a (slightly edited) stack trace that results from this error. If >>> anyone can shed any light on what exactly is happening here and what we can >>> do to avoid it, that would be much appreciated. >>> >>> org.apache.spark.SparkException: Failed to register classes with Kryo >>>>at >>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) >>>>at >>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) >>>>at >>>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309) >>>>at >>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) >>>>at >>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) >>>>at >>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) >>>>at >>>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) >>>>at >>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >>>>at >>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >>>>at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) >>>>at >>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) >>>>at >>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) >>>>at >>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) >>>>at >>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) >>>>at >>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) >>>>at >>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) >>>>at >>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) >>>>at >>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >>>>at >>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) >>>>at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>>>at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>>>at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>>>at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
Re: intermittent Kryo serialization failures in Spark
Hi, Did you try without the broadcast ? Regards JL Le jeu. 19 sept. 2019 à 06:41, Vadim Semenov a écrit : > Pre-register your classes: > > ``` > import com.esotericsoftware.kryo.Kryo > import org.apache.spark.serializer.KryoRegistrator > > class MyKryoRegistrator extends KryoRegistrator { > override def registerClasses(kryo: Kryo): Unit = { > kryo.register(Class.forName("[[B")) // byte[][] > kryo.register(classOf[java.lang.Class[_]]) > } > } > ``` > > then run with > > 'spark.kryo.referenceTracking': 'false', > 'spark.kryo.registrationRequired': 'false', > 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator', > 'spark.kryo.unsafe': 'false', > 'spark.kryoserializer.buffer.max': '256m', > > On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov > wrote: > >> Hi folks, >> >> Posted this some time ago but the problem continues to bedevil us. I'm >> including a (slightly edited) stack trace that results from this error. If >> anyone can shed any light on what exactly is happening here and what we can >> do to avoid it, that would be much appreciated. >> >> org.apache.spark.SparkException: Failed to register classes with Kryo >>> at >>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) >>> at >>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) >>> at >>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309) >>> at >>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) >>> at >>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >>> at >>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) >>> at >>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) >>> at >>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) >>> at >>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) >>> at >>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) >>> at >>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) >>> at >>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) >>> at >>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) >>> at >>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >>> at >>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>> at >>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >>> at >>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92) >>> at >>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) >>> at >>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) >>> at >>> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) >>> at >>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:
Re: intermittent Kryo serialization failures in Spark
I remember it not working for us when we were setting it from the inside and needed to actually pass it On Wed, Sep 18, 2019 at 10:38 AM Jerry Vinokurov wrote: > Hi Vadim, > > Thanks for your suggestion. We do preregister the classes, like so: > > object KryoRegistrar { >> >> val classesToRegister: Array[Class[_]] = Array( >> classOf[MyModel], >>[etc] >> ) } >> > > And then we do: > > val sparkConf = new SparkConf() >> .registerKryoClasses(KryoRegistrar.classesToRegister) >> > > I notice that this is a bit different from your code and I'm wondering > whether there's any functional difference or if these are two ways to get > to the same end. Our code is directly adapted from the Spark documentation > on how to use the Kryo serializer but maybe there's some subtlety here that > I'm missing. > > With regard to the settings, it looks like we currently have the default > settings, which is to say that referenceTracking is true, > registrationRequired is false, unsafe is false, and buffer.max is 64m (none > of our objects are anywhere near that size but... who knows). I will try it > with your suggestions and see if it solves the problem. > > thanks, > Jerry > > On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov wrote: > >> Pre-register your classes: >> >> ``` >> import com.esotericsoftware.kryo.Kryo >> import org.apache.spark.serializer.KryoRegistrator >> >> class MyKryoRegistrator extends KryoRegistrator { >> override def registerClasses(kryo: Kryo): Unit = { >> kryo.register(Class.forName("[[B")) // byte[][] >> kryo.register(classOf[java.lang.Class[_]]) >> } >> } >> ``` >> >> then run with >> >> 'spark.kryo.referenceTracking': 'false', >> 'spark.kryo.registrationRequired': 'false', >> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator', >> 'spark.kryo.unsafe': 'false', >> 'spark.kryoserializer.buffer.max': '256m', >> >> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov >> wrote: >> >>> Hi folks, >>> >>> Posted this some time ago but the problem continues to bedevil us. I'm >>> including a (slightly edited) stack trace that results from this error. If >>> anyone can shed any light on what exactly is happening here and what we can >>> do to avoid it, that would be much appreciated. >>> >>> org.apache.spark.SparkException: Failed to register classes with Kryo >>>>at >>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) >>>>at >>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) >>>>at >>>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309) >>>>at >>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) >>>>at >>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) >>>>at >>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) >>>>at >>>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) >>>>at >>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >>>>at >>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >>>>at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) >>>>at >>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) >>>>at >>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) >>>>at >>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) >>>>at >>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) >>>>at >>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) >>>>at >>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) >>>>at >>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) >>>>at >>>> or
Re: intermittent Kryo serialization failures in Spark
Hi Vadim, Thanks for your suggestion. We do preregister the classes, like so: object KryoRegistrar { > > val classesToRegister: Array[Class[_]] = Array( > classOf[MyModel], >[etc] > ) } > And then we do: val sparkConf = new SparkConf() > .registerKryoClasses(KryoRegistrar.classesToRegister) > I notice that this is a bit different from your code and I'm wondering whether there's any functional difference or if these are two ways to get to the same end. Our code is directly adapted from the Spark documentation on how to use the Kryo serializer but maybe there's some subtlety here that I'm missing. With regard to the settings, it looks like we currently have the default settings, which is to say that referenceTracking is true, registrationRequired is false, unsafe is false, and buffer.max is 64m (none of our objects are anywhere near that size but... who knows). I will try it with your suggestions and see if it solves the problem. thanks, Jerry On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov wrote: > Pre-register your classes: > > ``` > import com.esotericsoftware.kryo.Kryo > import org.apache.spark.serializer.KryoRegistrator > > class MyKryoRegistrator extends KryoRegistrator { > override def registerClasses(kryo: Kryo): Unit = { > kryo.register(Class.forName("[[B")) // byte[][] > kryo.register(classOf[java.lang.Class[_]]) > } > } > ``` > > then run with > > 'spark.kryo.referenceTracking': 'false', > 'spark.kryo.registrationRequired': 'false', > 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator', > 'spark.kryo.unsafe': 'false', > 'spark.kryoserializer.buffer.max': '256m', > > On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov > wrote: > >> Hi folks, >> >> Posted this some time ago but the problem continues to bedevil us. I'm >> including a (slightly edited) stack trace that results from this error. If >> anyone can shed any light on what exactly is happening here and what we can >> do to avoid it, that would be much appreciated. >> >> org.apache.spark.SparkException: Failed to register classes with Kryo >>> at >>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) >>> at >>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) >>> at >>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309) >>> at >>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) >>> at >>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >>> at >>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) >>> at >>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) >>> at >>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) >>> at >>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) >>> at >>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) >>> at >>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) >>> at >>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) >>> at >>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) >>> at >>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >>> at >>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>> at >>> org.apache.spark.rdd.RDDOpera
Re: intermittent Kryo serialization failures in Spark
Pre-register your classes: ``` import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(Class.forName("[[B")) // byte[][] kryo.register(classOf[java.lang.Class[_]]) } } ``` then run with 'spark.kryo.referenceTracking': 'false', 'spark.kryo.registrationRequired': 'false', 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator', 'spark.kryo.unsafe': 'false', 'spark.kryoserializer.buffer.max': '256m', On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov wrote: > Hi folks, > > Posted this some time ago but the problem continues to bedevil us. I'm > including a (slightly edited) stack trace that results from this error. If > anyone can shed any light on what exactly is happening here and what we can > do to avoid it, that would be much appreciated. > > org.apache.spark.SparkException: Failed to register classes with Kryo >> at >> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) >> at >> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) >> at >> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309) >> at >> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) >> at >> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) >> at >> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) >> at >> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) >> at >> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >> at >> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) >> at >> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) >> at >> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) >> at >> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) >> at >> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) >> at >> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) >> at >> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92) >> at >> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) >> at >> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) >> at >> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) >> at >> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) &
Re: intermittent Kryo serialization failures in Spark
Hi folks, Posted this some time ago but the problem continues to bedevil us. I'm including a (slightly edited) stack trace that results from this error. If anyone can shed any light on what exactly is happening here and what we can do to avoid it, that would be much appreciated. org.apache.spark.SparkException: Failed to register classes with Kryo > at > org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) > at > org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) > at > org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309) > at > org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) > at > org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) > at > org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) > at > org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) > at > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) > at > org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) > at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) > at > org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) > at > org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) > at > org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) > at > org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391) > at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at
intermittent Kryo serialization failures in Spark
Hi all, I am experiencing a strange intermittent failure of my Spark job that results from serialization issues in Kryo. Here is the stack trace: Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132) > at > org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132) > ... 204 more > > (I've edited the company and model name since this is proprietary code) This error does not surface every time the job is run; I would say it probably shows up once in every 10 runs or so, and there isn't anything about the input data that triggers this, as I've been able to (nondeterministically) reproduce the error by simply rerunning the job with the same inputs over and over again. The model itself is just a plain Scala case class whose fields are strings and integers, so there's no custom serialization logic or anything like that. As I understand, this is seems related to an issue previously documented here <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this was fixed long ago. I'm running this job on an AWS EMR cluster and have confirmed that the version of Spark running there is 2.4.0, with the patch that is linked in the above issue being part of the code. A suggested solution has been to set the extraClasspath config settings on the driver and executor, but that has not fixed the problem. I'm out of ideas for how to tackle this and would love to hear if anyone has any suggestions or strategies for fixing this. thanks, Jerry -- http://www.google.com/profiles/grapesmoker
Question regarding kryo and java encoders in datasets
Hi All, Good day! I am using spark 2.4 and referring https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence Bean class: public class EmployeeBean implements Serializable { private Long id; private String name; private Long salary; private Integer age; // getters and setters } Spark Example: SparkSession spark = SparkSession.builder().master("local[4]").appName("play-with-spark").getOrCreate(); List employees1 = populateEmployees(1, 1_000_000); Dataset ds1 = spark.createDataset(employees1, Encoders.kryo(EmployeeBean.class)); Dataset ds2 = spark.createDataset(employees1, Encoders.bean(EmployeeBean.class)); ds1.persist(StorageLevel.MEMORY_ONLY()); long ds1Count = ds1.count(); ds2.persist(StorageLevel.MEMORY_ONLY()); long ds2Count = ds2.count(); I looked for storage in spark Web UI. Useful part - ID RDD Name Size in Memory 2 LocalTableScan [value#0] 56.5 MB 13 LocalTableScan [age#6, id#7L, name#8, salary#9L] 23.3 MB Few questions: * Shouldn't size of kryo serialized RDD be less than java serialized RDD instead of more than double size? * I also tried MEMORY_ONLY_SER() mode and RDD size is the same. RDD as serialized Java objects should be stored as one byte array per partition. Shouldn't the size of persisted RDDs be less than deserialized ones? * What exactly is adding kyro and bean encoders are doing while creating Dataset? * Can I rename persisted RDDs for better readability? Regards, Devender NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference. <> - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Internal Spark class is not registered by Kryo
Hi all: I have set spark.kryo.registrationRequired=true, but an exception occured: java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run the program. I tried to register it manually by kryo.register() and Sparkconf.registerKryoClasses, but it still not work. The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same problem I think. Be grateful if someone can help me.. Best Regards, Lijun Cao
Internal Spark class is not registered by Kryo
Hi all: I have set spark.kryo.registrationRequired=true, but an exception occured: java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run the program. I tried to register it manually by kryo.register() and Sparkconf.registerKryoClasses, but it still not work. The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same problem I think. Be grateful if someone can help me.. Best Regards, Lijun Cao - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Internal Spark class is not registered by Kryo
Hi developers: I have set spark.kryo.registrationRequired=true, but an exception occured: java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run the program. I tried to register it manually by kryo.register() and Sparkconf.registerKryoClasses, but it still not work. The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same problem I think. Be grateful if someone can help me.. Best Regards, Lijun Cao
Internal Spark class is not registered by Kryo
Hi developers: I have set spark.kryo.registrationRequired=true, but an exception occured: java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run the program. I tried to register it manually by kryo.register() and Sparkconf.registerKryoClasses, but it still not work. The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same problem I think. Be grateful if someone can help me.. Best Regards, Lijun Cao
Internal Spark class is not registered by Kryo
Hi developers: I have set spark.kryo.registrationRequired=true, but an exception occured: java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run the program. I tried to register it manually by kryo.register() and Sparkconf.registerKryoClasses, but it still not work. The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same problem I think. Be grateful if someone can help me.. Best Regards, Lijun Cao
Spark internal class is not registered by Kryo
Hi developers: I have set spark.kryo.registrationRequired=true, but an exception occured: java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run the program. I tried to register it manually by kryo.register() and Sparkconf.registerKryoClasses, but it still not work. The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same problem I think. Be grateful if someone can help me.. Best Regards, Lijun Cao
Internal Spark class is not registered by Kryo
Hi developers: I have set spark.kryo.registrationRequired=true, but an exception occured: java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run the program. I tried to register it manually by kryo.register() and Sparkconf.registerKryoClasses, but it still not work. The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same problem I think. Be grateful if someone can help me.. Best Regards, Lijun Cao
How Kryo serializer allocates buffer in Spark
I am getting following error in spark task. Default max value is 64mb! Document says it should be large enough to store largest object in my application. I don't think I have any object thhhat is bigger then 64mb. SO what these values (spark.kryoserializer.buffer, spark.kryoserializer.buffer.max) means? Is that a buffer per executor or buffer per executor per core ? I have 6 cores per executors so do all 6 are writing to this common buffer? in that case I have 16mb buffer per core. Please explain. Thanks! Job aborted due to stage failure: Task 3 in stage 4.0 failed 10 times, most recent failure: Lost task 3.9 in stage 4.0 (TID 16, iadprd01mpr005.mgmt-a.xactly.iad.dc.local): org.apache.spark.SparkException: *Kryo serialization failed: Buffer overflow. Available: 0, required: 19. To avoid this, increase spark.kryoserializer.buffer.max value.* at org.apache.spark.serializer.KryoSerializerInstance. serialize(KryoSerializer.scala:300) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Re: Kryo serialization failed: Buffer overflow : Broadcast Join
I am using spark 2.1.0 On Fri, Feb 2, 2018 at 5:08 PM, Pralabh Kumarwrote: > Hi > > I am performing broadcast join where my small table is 1 gb . I am > getting following error . > > I am using > > > org.apache.spark.SparkException: > . Available: 0, required: 28869232. To avoid this, increase > spark.kryoserializer.buffer.max value > > > > I increase the value to > > spark.conf.set("spark.kryoserializer.buffer.max","2g") > > > But I am still getting the error . > > Please help > > Thx >
Kryo serialization failed: Buffer overflow : Broadcast Join
Hi I am performing broadcast join where my small table is 1 gb . I am getting following error . I am using org.apache.spark.SparkException: . Available: 0, required: 28869232. To avoid this, increase spark.kryoserializer.buffer.max value I increase the value to spark.conf.set("spark.kryoserializer.buffer.max","2g") But I am still getting the error . Please help Thx
Re: Kryo not registered class
Try: Class.forName("[Lorg.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation;") On Sun, Nov 19, 2017 at 3:24 PM, Angel Francisco Orta < angel.francisco.o...@gmail.com> wrote: > Hello, I'm with spark 2.1.0 with scala and I'm registering all classes > with kryo, and I have a problem registering this class, > > org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$ > SerializableFileStatus$SerializableBlockLocation[] > > I can't register with classOf[Array[Class.forName("org.apache.spark.sql. > execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$ > SerializableBlockLocation").type]] > > > I have tried as well creating a java class like register and registering > the class as org.apache.spark.sql.execution.datasources. > PartitioningAwareFileIndex$SerializableFileStatus$ > SerializableBlockLocation[].class; > > Any clue is appreciatted, > > Thanks. > >
Kryo not registered class
Hello, I'm with spark 2.1.0 with scala and I'm registering all classes with kryo, and I have a problem registering this class, org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation[] I can't register with classOf[Array[Class.forName("org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation").type]] I have tried as well creating a java class like register and registering the class as org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation[].class; Any clue is appreciatted, Thanks.
spark 2.1.1 ml.LogisticRegression with large feature set cause Kryo serialization failed: Buffer overflow
I try to train a big model. I have 40 million instances and 50 million feature set, and it is sparse. I am using 40 executors with 20 GB each + driver with 40 GB. The number of data partitions is 5000, the treeAggregate depth is 4, the spark.kryoserializer.buffer.max is 2016m, the spark.driver.maxResultSize is 40G. The execution fails with the following messages: +WARN TaskSetManager: Lost task 2.1 in stage 25.0 (TID 1415, Blackstone064183, executor 15): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 3, required: 8 Serialization trace: currMin (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer). To avoid this, increase spark.kryoserializer.buffer.max value. at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:315) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:364) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) + I know that spark.kryoserializer.buffer.max limit 2g and can not increase. I have already try increasing partition num to 1 and treeAggregate depth to 200, it still failed with same error message. And I try use java serializer without kryoserializer, it failed with oom: WARN TaskSetManager: Lost task 5.0 in stage 32.0 (TID 15701, Blackstone065188, executor 4): +java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:364) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)+ Any advice? -- fun coding
Fwd: Saving RDD as Kryo (broken in 2.1)
Hi, all! I have a code, serializing RDD as Kryo, and saving it as sequence file. It works fine in 1.5.1, but, while switching to 2.1.1 it does not work. I am trying to serialize RDD of Tuple2<> (got from PairRDD). 1. RDD consists of different heterogeneous objects (aggregates, like HLL, QTree, Min, Max, etc.) 2. Save is performed within streaming 3. Read is performed out of streaming (another app) 4. Supposed, that such error can be due to custom serializers - turned them off, but errors still exists 5. Tried disabling references in Kryo (since I saw an error while resolving references) - got StackOverflow, and significant performance degradation 6. Implementing Serializable/Externalizable is not a solution, unfortunately. Expected behavior: saveAsObjectFile/loadObjectFile are symmetric, and it's possible to load previously saved RDD. Code of save/load: object KryoFile { val THREAD_LOCAL_CACHE = new ThreadLocal[Kryo] /* * Used to write as Object file using kryo serialization */ def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) { val kryoSerializer = new KryoSerializer(rdd.context.getConf) rdd.context.setJobDescription("Saving to path " + path) rdd.mapPartitions(iter => iter.grouped(10) .map(_.toArray)) .map(splitArray => { //initializes kyro and calls your registrator class var kryo = THREAD_LOCAL_CACHE.get() if (null == kryo) { kryo = kryoSerializer.newKryo() THREAD_LOCAL_CACHE.set(kryo) } //convert data to bytes val bao = new ByteArrayOutputStream() val output = kryoSerializer.newKryoOutput() output.setOutputStream(bao) kryo.writeClassAndObject(output, splitArray) output.close() kryo.reset() // We are ignoring key field of sequence file val byteWritable = new BytesWritable(bao.toByteArray) (NullWritable.get(), byteWritable) }).saveAsSequenceFile(path) } /* * Method to read from object file which is saved kryo format. */ def loadObjectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)(implicit ct: ClassTag[T]) = { val kryoSerializer = new KryoSerializer(sc.getConf) sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => { var kryo = THREAD_LOCAL_CACHE.get() if (null == kryo) { kryo = kryoSerializer.newKryo() THREAD_LOCAL_CACHE.set(kryo) } val input = new Input() input.setBuffer(x._2.getBytes) val data = kryo.readClassAndObject(input) kryo.reset() val dataObject = data.asInstanceOf[Array[T]] dataObject }) } } When trying to deserialize, I got such errors: 17/06/21 08:19:18 ERROR Executor: Exception in task 14.0 in stage 0.0 (TID 14) java.lang.ArrayIndexOutOfBoundsException: -2 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject( MapReferenceResolver.java:60) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:706) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ ObjectArraySerializer.read(DefaultArraySerializers.java:396) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ ObjectArraySerializer.read(DefaultArraySerializers.java:307) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$ 1.apply(KryoFile.scala:75) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$ 1.apply(KryoFile.scala:62) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( SparkContext.scala:1951) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( SparkContext.scala:1951) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/06/21 08:19:18 ERROR Executor: Exception in task 12.0 in stage 0.0 (TID 12) java.lang.ArrayStoreException: java.util.Collections$EmptyMap at com.esotericsoftware.kryo.serializer
Saving RDD as Kryo (broken in 2.1)
Hi, all! I have a code, serializing RDD as Kryo, and saving it as sequence file. It works fine in 1.5.1, but, while switching to 2.1.1 it does not work. I am trying to serialize RDD of Tuple2<> (got from PairRDD). 1. RDD consists of different heterogeneous objects (aggregates, like HLL, QTree, Min, Max, etc.) 2. Save is performed within streaming 3. Read is performed out of streaming (another app) 4. Supposed, that such error can be due to custom serializers - turned them off, but errors still exists 5. Tried disabling references in Kryo (since I saw an error while resolving references) - got StackOverflow, and significant performance degradation 6. Implementing Serializable/Externalizable is not a solution, unfortunately. Expected behavior: saveAsObjectFile/loadObjectFile are symmetric, and it's possible to load previously saved RDD. Code of save/load: object KryoFile { val THREAD_LOCAL_CACHE = new ThreadLocal[Kryo] /* * Used to write as Object file using kryo serialization */ def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) { val kryoSerializer = new KryoSerializer(rdd.context.getConf) rdd.context.setJobDescription("Saving to path " + path) rdd.mapPartitions(iter => iter.grouped(10) .map(_.toArray)) .map(splitArray => { //initializes kyro and calls your registrator class var kryo = THREAD_LOCAL_CACHE.get() if (null == kryo) { kryo = kryoSerializer.newKryo() THREAD_LOCAL_CACHE.set(kryo) } //convert data to bytes val bao = new ByteArrayOutputStream() val output = kryoSerializer.newKryoOutput() output.setOutputStream(bao) kryo.writeClassAndObject(output, splitArray) output.close() kryo.reset() // We are ignoring key field of sequence file val byteWritable = new BytesWritable(bao.toByteArray) (NullWritable.get(), byteWritable) }).saveAsSequenceFile(path) } /* * Method to read from object file which is saved kryo format. */ def loadObjectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)(implicit ct: ClassTag[T]) = { val kryoSerializer = new KryoSerializer(sc.getConf) sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => { var kryo = THREAD_LOCAL_CACHE.get() if (null == kryo) { kryo = kryoSerializer.newKryo() THREAD_LOCAL_CACHE.set(kryo) } val input = new Input() input.setBuffer(x._2.getBytes) val data = kryo.readClassAndObject(input) kryo.reset() val dataObject = data.asInstanceOf[Array[T]] dataObject }) } } When trying to deserialize, I got such errors: 17/06/21 08:19:18 ERROR Executor: Exception in task 14.0 in stage 0.0 (TID 14) java.lang.ArrayIndexOutOfBoundsException: -2 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject( MapReferenceResolver.java:60) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:706) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ ObjectArraySerializer.read(DefaultArraySerializers.java:396) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ ObjectArraySerializer.read(DefaultArraySerializers.java:307) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$ 1.apply(KryoFile.scala:75) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$ 1.apply(KryoFile.scala:62) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( SparkContext.scala:1951) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( SparkContext.scala:1951) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/06/21 08:19:18 ERROR Executor: Exception in task 12.0 in stage 0.0 (TID 12) java.lang.ArrayStoreException: java.util.Collections$EmptyMap at com.esotericsoftware.kryo.serializer
Saving RDD as Kryo (broken in 2.1)
Hi, all! I have a code, serializing RDD as Kryo, and saving it as sequence file. It works fine in 1.5.1, but, while switching to 2.1.1 it does not work. I am trying to serialize RDD of Tuple2<> (got from PairRDD). 1. RDD consists of different heterogeneous objects (aggregates, like HLL, QTree, Min, Max, etc.) 2. Save is performed within streaming 3. Read is performed out of streaming (another app) 4. Supposed, that such error can be due to custom serializers - turned them off, but errors still exists 5. Tried disabling references in Kryo (since I saw an error while resolving references) - got StackOverflow, and significant performance degradation 6. Implementing Serializable/Externalizable is not a solution, unfortunately. Expected behavior: saveAsObjectFile/loadObjectFile are symmetric, and it's possible to load previously saved RDD. Code of save/load: object KryoFile { val THREAD_LOCAL_CACHE = new ThreadLocal[Kryo] /* * Used to write as Object file using kryo serialization */ def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) { val kryoSerializer = new KryoSerializer(rdd.context.getConf) rdd.context.setJobDescription("Saving to path " + path) rdd.mapPartitions(iter => iter.grouped(10) .map(_.toArray)) .map(splitArray => { //initializes kyro and calls your registrator class var kryo = THREAD_LOCAL_CACHE.get() if (null == kryo) { kryo = kryoSerializer.newKryo() THREAD_LOCAL_CACHE.set(kryo) } //convert data to bytes val bao = new ByteArrayOutputStream() val output = kryoSerializer.newKryoOutput() output.setOutputStream(bao) kryo.writeClassAndObject(output, splitArray) output.close() kryo.reset() // We are ignoring key field of sequence file val byteWritable = new BytesWritable(bao.toByteArray) (NullWritable.get(), byteWritable) }).saveAsSequenceFile(path) } /* * Method to read from object file which is saved kryo format. */ def loadObjectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)(implicit ct: ClassTag[T]) = { val kryoSerializer = new KryoSerializer(sc.getConf) sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => { var kryo = THREAD_LOCAL_CACHE.get() if (null == kryo) { kryo = kryoSerializer.newKryo() THREAD_LOCAL_CACHE.set(kryo) } val input = new Input() input.setBuffer(x._2.getBytes) val data = kryo.readClassAndObject(input) kryo.reset() val dataObject = data.asInstanceOf[Array[T]] dataObject }) } } When trying to deserialize, I got such errors: 17/06/21 08:19:18 ERROR Executor: Exception in task 14.0 in stage 0.0 (TID 14) java.lang.ArrayIndexOutOfBoundsException: -2 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:706) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:75) at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:62) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/06/21 08:19:18 ERROR Executor: Exception in task 12.0 in stage 0.0 (TID 12) java.lang.ArrayStoreException: java.util.Collections$EmptyMap at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$O
Re: dataset aggregators with kryo encoder very slow
sorry i meant to say SPARK-18980 On Sat, Jan 21, 2017 at 1:48 AM, Koert Kuipers <ko...@tresata.com> wrote: > found it :) SPARK-1890 > thanks cloud-fan > > On Sat, Jan 21, 2017 at 1:46 AM, Koert Kuipers <ko...@tresata.com> wrote: > >> trying to replicate this in spark itself i can for v2.1.0 but not for >> master. i guess it has been fixed >> >> On Fri, Jan 20, 2017 at 4:57 PM, Koert Kuipers <ko...@tresata.com> wrote: >> >>> i started printing out when kryo serializes my buffer data structure for >>> my aggregator. >>> >>> i would expect every buffer object to ideally get serialized only once: >>> at the end of the map-side before the shuffle (so after all the values for >>> the given key within the partition have been reduced into it). i realize >>> that in reality due to the order of the elements coming in this can not >>> always be achieved. but what i see instead is that the buffer is getting >>> serialized after every call to reduce a value into it, always. could this >>> be the reason it is so slow? >>> >>> On Thu, Jan 19, 2017 at 4:17 PM, Koert Kuipers <ko...@tresata.com> >>> wrote: >>> >>>> we just converted a job from RDD to Dataset. the job does a single >>>> map-red phase using aggregators. we are seeing very bad performance for the >>>> Dataset version, about 10x slower. >>>> >>>> in the Dataset version we use kryo encoders for some of the >>>> aggregators. based on some basic profiling of spark in local mode i believe >>>> the bad performance is due to the kryo encoders. about 70% of time is spend >>>> in kryo related classes. >>>> >>>> since we also use kryo for serialization with the RDD i am surprised >>>> how big the performance difference is. >>>> >>>> has anyone seen the same thing? any suggestions for how to improve this? >>>> >>>> >>> >> >
Re: dataset aggregators with kryo encoder very slow
found it :) SPARK-1890 thanks cloud-fan On Sat, Jan 21, 2017 at 1:46 AM, Koert Kuipers <ko...@tresata.com> wrote: > trying to replicate this in spark itself i can for v2.1.0 but not for > master. i guess it has been fixed > > On Fri, Jan 20, 2017 at 4:57 PM, Koert Kuipers <ko...@tresata.com> wrote: > >> i started printing out when kryo serializes my buffer data structure for >> my aggregator. >> >> i would expect every buffer object to ideally get serialized only once: >> at the end of the map-side before the shuffle (so after all the values for >> the given key within the partition have been reduced into it). i realize >> that in reality due to the order of the elements coming in this can not >> always be achieved. but what i see instead is that the buffer is getting >> serialized after every call to reduce a value into it, always. could this >> be the reason it is so slow? >> >> On Thu, Jan 19, 2017 at 4:17 PM, Koert Kuipers <ko...@tresata.com> wrote: >> >>> we just converted a job from RDD to Dataset. the job does a single >>> map-red phase using aggregators. we are seeing very bad performance for the >>> Dataset version, about 10x slower. >>> >>> in the Dataset version we use kryo encoders for some of the aggregators. >>> based on some basic profiling of spark in local mode i believe the bad >>> performance is due to the kryo encoders. about 70% of time is spend in kryo >>> related classes. >>> >>> since we also use kryo for serialization with the RDD i am surprised how >>> big the performance difference is. >>> >>> has anyone seen the same thing? any suggestions for how to improve this? >>> >>> >> >
Re: dataset aggregators with kryo encoder very slow
trying to replicate this in spark itself i can for v2.1.0 but not for master. i guess it has been fixed On Fri, Jan 20, 2017 at 4:57 PM, Koert Kuipers <ko...@tresata.com> wrote: > i started printing out when kryo serializes my buffer data structure for > my aggregator. > > i would expect every buffer object to ideally get serialized only once: at > the end of the map-side before the shuffle (so after all the values for the > given key within the partition have been reduced into it). i realize that > in reality due to the order of the elements coming in this can not always > be achieved. but what i see instead is that the buffer is getting > serialized after every call to reduce a value into it, always. could this > be the reason it is so slow? > > On Thu, Jan 19, 2017 at 4:17 PM, Koert Kuipers <ko...@tresata.com> wrote: > >> we just converted a job from RDD to Dataset. the job does a single >> map-red phase using aggregators. we are seeing very bad performance for the >> Dataset version, about 10x slower. >> >> in the Dataset version we use kryo encoders for some of the aggregators. >> based on some basic profiling of spark in local mode i believe the bad >> performance is due to the kryo encoders. about 70% of time is spend in kryo >> related classes. >> >> since we also use kryo for serialization with the RDD i am surprised how >> big the performance difference is. >> >> has anyone seen the same thing? any suggestions for how to improve this? >> >> >
Kryo (with Spark 1.6.3) class registration slows down processing
Hello, Here is something I am unable to explain and goes against Kryo's documentation, numerous suggestions on the web and on this list as well as pure intuition. Our Spark application runs in a single JVM (perhaps this is relevant, hence mentioning it). We have been using Kryo serialization with Spark (setting the spark.serializer property to org.apache.spark.serializer.KryoSerializer) without explicitly registering classes and everything seems to work well enough. Recently, I have been looking into making some performance improvements and decided to register classes. I turned on the "spark.kryo.registrationRequired" property and started to register all classes as they were reported by the resulting Exceptions. Eventually I managed to register them all. BTW, there is a fairly large number of internal Spark and Scala classes that also I had to register but that's besides the point here. I was hoping to gain some performance improvement as per the suggestions of registering classes. However, what I saw was the exact opposite and surprising. Performance (throughput) actually deteriorated by at least a factor of 50%. I turned off the registrationRequired property but kept the explicit registrations in place with the same result. Then I reduced the number of registrations and performance started to get better again. Eventually I got rid of all the explicit registrations (back to where I started basically) and performance improved back to where it was. I am unable to explain why I am observing this behavior as this is counter-intuitive. Explicit registration is supposed to write smaller amount of data (class names as Strings vs just class Ids as integers) and hence help improve performance. Is the fact that Spark is running in local mode (single JVM) a factor here? Any insights will be appreciated. Thanks NB
Re: dataset aggregators with kryo encoder very slow
i started printing out when kryo serializes my buffer data structure for my aggregator. i would expect every buffer object to ideally get serialized only once: at the end of the map-side before the shuffle (so after all the values for the given key within the partition have been reduced into it). i realize that in reality due to the order of the elements coming in this can not always be achieved. but what i see instead is that the buffer is getting serialized after every call to reduce a value into it, always. could this be the reason it is so slow? On Thu, Jan 19, 2017 at 4:17 PM, Koert Kuipers <ko...@tresata.com> wrote: > we just converted a job from RDD to Dataset. the job does a single map-red > phase using aggregators. we are seeing very bad performance for the Dataset > version, about 10x slower. > > in the Dataset version we use kryo encoders for some of the aggregators. > based on some basic profiling of spark in local mode i believe the bad > performance is due to the kryo encoders. about 70% of time is spend in kryo > related classes. > > since we also use kryo for serialization with the RDD i am surprised how > big the performance difference is. > > has anyone seen the same thing? any suggestions for how to improve this? > >
dataset aggregators with kryo encoder very slow
we just converted a job from RDD to Dataset. the job does a single map-red phase using aggregators. we are seeing very bad performance for the Dataset version, about 10x slower. in the Dataset version we use kryo encoders for some of the aggregators. based on some basic profiling of spark in local mode i believe the bad performance is due to the kryo encoders. about 70% of time is spend in kryo related classes. since we also use kryo for serialization with the RDD i am surprised how big the performance difference is. has anyone seen the same thing? any suggestions for how to improve this?
Re: Kryo On Spark 1.6.0
For scala, you could fix it by using: conf.registerKryoClasses(Array(Class.forName("scala.collection.mutable. WrappedArray$ofRef"))) By the way, if the class is array of primitive class of Java, say byte[], then to use: Class.forName("[B") if it is array of other class, say scala.collection.mutable.WrappedArray$ofRef, then to use: Class.forName("[Lscala.collection.mutable.WrappedArray$ofRef") ref: https://docs.oracle.com/javase/8/docs/api/java/lang/Class.html#getName-- On Tue, Jan 10, 2017 at 11:11 PM, Yang Cao <cybea...@gmail.com> wrote: > If you don’t mind, could please share me with the scala solution? I tried > to use kryo but seamed not work at all. I hope to get some practical > example. THX > > On 2017年1月10日, at 19:10, Enrico DUrso <enrico.du...@everis.com> wrote: > > Hi, > > I am trying to use Kryo on Spark 1.6.0. > I am able to register my own classes and it works, but when I set > “spark.kryo.registrationRequired “ to true, I get an error about a scala > class: > “Class is not registered: scala.collection.mutable.WrappedArray$ofRef”. > > Any of you has already solved this issue in Java? I found the code to > solve it in Scala, but unable to register this class in Java. > > Cheers, > > enrico > > -- > > CONFIDENTIALITY WARNING. > This message and the information contained in or attached to it are > private and confidential and intended exclusively for the addressee. everis > informs to whom it may receive it in error that it contains privileged > information and its use, copy, reproduction or distribution is prohibited. > If you are not an intended recipient of this E-mail, please notify the > sender, delete it and do not read, act upon, print, disclose, copy, retain > or redistribute any portion of this E-mail. > > >
Re: Sporadic ClassNotFoundException with Kryo
I faced a similar issue and had to do two things; 1. Submit Kryo jar with the spark-submit 2. Set spark.executor.userClassPathFirst true in Spark conf On Fri, Nov 18, 2016 at 7:39 PM, chrism <christopher.martens...@cics.se> wrote: > Regardless of the different ways we have tried deploying a jar together > with > Spark, when running a Spark Streaming job with Kryo as serializer on top of > Mesos, we sporadically get the following error (I have truncated a bit): > > /16/11/18 08:39:10 ERROR OneForOneBlockFetcher: Failed while starting block > fetches > java.lang.RuntimeException: org.apache.spark.SparkException: Failed to > register classes with Kryo > at > org.apache.spark.serializer.KryoSerializer.newKryo(KryoSeria > lizer.scala:129) > at > org.apache.spark.serializer.KryoSerializerInstance.borrowKry > o(KryoSerializer.scala:274) > ... > at > org.apache.spark.serializer.SerializerManager.dataSerializeS > tream(SerializerManager.scala:125) > at > org.apache.spark.storage.BlockManager$$anonfun$dropFromMemor > y$3.apply(BlockManager.scala:1265) > at > org.apache.spark.storage.BlockManager$$anonfun$dropFromMemor > y$3.apply(BlockManager.scala:1261) > ... > Caused by: java.lang.ClassNotFoundException: cics.udr.compound_ran_udr > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)/ > > where "cics.udr.compound_ran_udr" is a class provided by us in a jar. > > We know that the jar containing "cics.udr.compound_ran_udr" is being > deployed and works because it is listed in the "Environment" tab in the > GUI, > and calculations using this class succeed. > > We have tried the following methods of deploying the jar containing the > class > * Through --jars in spark-submit > * Through SparkConf.setJar > * Through spark.driver.extraClassPath and spark.executor.extraClassPath > * By having it as the main jar used by spark-submit > with no luck. The logs (see attached) recognize that the jar is being added > to the classloader. > > We have tried registering the class using > * SparkConf.registerKryoClasses. > * spark.kryo.classesToRegister > with no luck. > > We are running on Mesos and the jar has been deployed on every machine on > the local file system in the same location. > > I would be very grateful for any help or ideas :) > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Sporadic-ClassNotFoundException-with-K > ryo-tp28104.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Thanks & regards, Nirmal Associate Technical Lead - Data Technologies Team, WSO2 Inc. Mobile: +94715779733 <+94%2071%20577%209733> Blog: http://nirmalfdo.blogspot.com/
Kryo and Spark 1.6.0 - Does it require a default empty constructor?
Hi, I have a doubt about Kryo and Spark 1.6.0. I read that for using Kryo, the class that you want to serialize must have a default constructor. I created a simple class avoiding to insert such a constructor and If I try to serialize manually, it does not work. But If I use that class in Spark and then I collect for forcing serialization it works. Any idea? Cheers CONFIDENTIALITY WARNING. This message and the information contained in or attached to it are private and confidential and intended exclusively for the addressee. everis informs to whom it may receive it in error that it contains privileged information and its use, copy, reproduction or distribution is prohibited. If you are not an intended recipient of this E-mail, please notify the sender, delete it and do not read, act upon, print, disclose, copy, retain or redistribute any portion of this E-mail.
RE: Kryo On Spark 1.6.0 [Solution in this email]
Yes sure, you can find it here: http://stackoverflow.com/questions/34736587/kryo-serializer-causing-exception-on-underlying-scala-class-wrappedarray hope it works, I did not try, I am using Java. To be precise I found the solution for my problem: To sum up, I had problems in registering the following class in Java: “scala.collection.mutable.WrappedArray$ofRef” The tip is: Class a = Class.forName(“scala.collection.mutable.WrappedArray$ofRef”) and then put a in the array of classes you are passing to the method registerKryoClasses() From: Yang Cao [mailto:cybea...@gmail.com] Sent: 10 January 2017 15:12 To: Enrico DUrso Cc: user@spark.apache.org Subject: Re: Kryo On Spark 1.6.0 If you don’t mind, could please share me with the scala solution? I tried to use kryo but seamed not work at all. I hope to get some practical example. THX On 2017年1月10日, at 19:10, Enrico DUrso <enrico.du...@everis.com<mailto:enrico.du...@everis.com>> wrote: Hi, I am trying to use Kryo on Spark 1.6.0. I am able to register my own classes and it works, but when I set “spark.kryo.registrationRequired “ to true, I get an error about a scala class: “Class is not registered: scala.collection.mutable.WrappedArray$ofRef”. Any of you has already solved this issue in Java? I found the code to solve it in Scala, but unable to register this class in Java. Cheers, enrico CONFIDENTIALITY WARNING. This message and the information contained in or attached to it are private and confidential and intended exclusively for the addressee. everis informs to whom it may receive it in error that it contains privileged information and its use, copy, reproduction or distribution is prohibited. If you are not an intended recipient of this E-mail, please notify the sender, delete it and do not read, act upon, print, disclose, copy, retain or redistribute any portion of this E-mail. CONFIDENTIALITY WARNING. This message and the information contained in or attached to it are private and confidential and intended exclusively for the addressee. everis informs to whom it may receive it in error that it contains privileged information and its use, copy, reproduction or distribution is prohibited. If you are not an intended recipient of this E-mail, please notify the sender, delete it and do not read, act upon, print, disclose, copy, retain or redistribute any portion of this E-mail.
Re: Kryo On Spark 1.6.0
If you don’t mind, could please share me with the scala solution? I tried to use kryo but seamed not work at all. I hope to get some practical example. THX > On 2017年1月10日, at 19:10, Enrico DUrso <enrico.du...@everis.com> wrote: > > Hi, > > I am trying to use Kryo on Spark 1.6.0. > I am able to register my own classes and it works, but when I set > “spark.kryo.registrationRequired “ to true, I get an error about a scala > class: > “Class is not registered: scala.collection.mutable.WrappedArray$ofRef”. > > Any of you has already solved this issue in Java? I found the code to solve > it in Scala, but unable to register this class in Java. > > Cheers, > > enrico > > > CONFIDENTIALITY WARNING. > This message and the information contained in or attached to it are private > and confidential and intended exclusively for the addressee. everis informs > to whom it may receive it in error that it contains privileged information > and its use, copy, reproduction or distribution is prohibited. If you are not > an intended recipient of this E-mail, please notify the sender, delete it and > do not read, act upon, print, disclose, copy, retain or redistribute any > portion of this E-mail.
RE: Kryo On Spark 1.6.0
Hi, I agree with you Richard. The point is that, looks like some classes which are used internally by Spark are not registered (for instance, the one I mentioned in the previous email is something I am not directly using). For those classes the serialization performance will be poor in according to how Spark works. How can I register all those classes? cheers, From: Richard Startin [mailto:richardstar...@outlook.com] Sent: 10 January 2017 11:18 To: Enrico DUrso; user@spark.apache.org Subject: Re: Kryo On Spark 1.6.0 Hi Enrico, Only set spark.kryo.registrationRequired if you want to forbid any classes you have not explicitly registered - see http://spark.apache.org/docs/latest/configuration.html. Configuration - Spark 2.0.2 Documentation<http://spark.apache.org/docs/latest/configuration.html> spark.apache.org Spark Configuration. Spark Properties. Dynamically Loading Spark Properties; Viewing Spark Properties; Available Properties. Application Properties; Runtime Environment To enable kryo, you just need spark.serializer=org.apache.spark.serializer.KryoSerializer. There is some info here - http://spark.apache.org/docs/latest/tuning.html Cheers, Richard https://richardstartin.com/ From: Enrico DUrso <enrico.du...@everis.com<mailto:enrico.du...@everis.com>> Sent: 10 January 2017 11:10 To: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Kryo On Spark 1.6.0 Hi, I am trying to use Kryo on Spark 1.6.0. I am able to register my own classes and it works, but when I set "spark.kryo.registrationRequired " to true, I get an error about a scala class: "Class is not registered: scala.collection.mutable.WrappedArray$ofRef". Any of you has already solved this issue in Java? I found the code to solve it in Scala, but unable to register this class in Java. Cheers, enrico CONFIDENTIALITY WARNING. This message and the information contained in or attached to it are private and confidential and intended exclusively for the addressee. everis informs to whom it may receive it in error that it contains privileged information and its use, copy, reproduction or distribution is prohibited. If you are not an intended recipient of this E-mail, please notify the sender, delete it and do not read, act upon, print, disclose, copy, retain or redistribute any portion of this E-mail. CONFIDENTIALITY WARNING. This message and the information contained in or attached to it are private and confidential and intended exclusively for the addressee. everis informs to whom it may receive it in error that it contains privileged information and its use, copy, reproduction or distribution is prohibited. If you are not an intended recipient of this E-mail, please notify the sender, delete it and do not read, act upon, print, disclose, copy, retain or redistribute any portion of this E-mail.
Re: Kryo On Spark 1.6.0
Hi Enrico, Only set spark.kryo.registrationRequired if you want to forbid any classes you have not explicitly registered - see http://spark.apache.org/docs/latest/configuration.html. Configuration - Spark 2.0.2 Documentation<http://spark.apache.org/docs/latest/configuration.html> spark.apache.org Spark Configuration. Spark Properties. Dynamically Loading Spark Properties; Viewing Spark Properties; Available Properties. Application Properties; Runtime Environment To enable kryo, you just need spark.serializer=org.apache.spark.serializer.KryoSerializer. There is some info here - http://spark.apache.org/docs/latest/tuning.html <http://spark.apache.org/docs/latest/tuning.html>Cheers, Richard https://richardstartin.com/ From: Enrico DUrso <enrico.du...@everis.com> Sent: 10 January 2017 11:10 To: user@spark.apache.org Subject: Kryo On Spark 1.6.0 Hi, I am trying to use Kryo on Spark 1.6.0. I am able to register my own classes and it works, but when I set "spark.kryo.registrationRequired " to true, I get an error about a scala class: "Class is not registered: scala.collection.mutable.WrappedArray$ofRef". Any of you has already solved this issue in Java? I found the code to solve it in Scala, but unable to register this class in Java. Cheers, enrico CONFIDENTIALITY WARNING. This message and the information contained in or attached to it are private and confidential and intended exclusively for the addressee. everis informs to whom it may receive it in error that it contains privileged information and its use, copy, reproduction or distribution is prohibited. If you are not an intended recipient of this E-mail, please notify the sender, delete it and do not read, act upon, print, disclose, copy, retain or redistribute any portion of this E-mail.
Kryo On Spark 1.6.0
Hi, I am trying to use Kryo on Spark 1.6.0. I am able to register my own classes and it works, but when I set "spark.kryo.registrationRequired " to true, I get an error about a scala class: "Class is not registered: scala.collection.mutable.WrappedArray$ofRef". Any of you has already solved this issue in Java? I found the code to solve it in Scala, but unable to register this class in Java. Cheers, enrico CONFIDENTIALITY WARNING. This message and the information contained in or attached to it are private and confidential and intended exclusively for the addressee. everis informs to whom it may receive it in error that it contains privileged information and its use, copy, reproduction or distribution is prohibited. If you are not an intended recipient of this E-mail, please notify the sender, delete it and do not read, act upon, print, disclose, copy, retain or redistribute any portion of this E-mail.
Re: Spark kryo serialization register Datatype[]
I already set .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") to enable kryo and .set("spark.kryo.registrationRequired", "true") to force kryo. Strangely, I see the issue of this missing Dataset[] Trying to register regular classes like Date .registerKryoClasses(Array(classOf[Date])) works just fine. but registering the spark internal Dataset[] is not working / as far as I read the docs should be handled by spark. Vadim Semenov <vadim.seme...@datadoghq.com> schrieb am Mi., 21. Dez. 2016 um 17:12 Uhr: > to enable kryo serializer you just need to pass > `spark.serializer=org.apache.spark.serializer.KryoSerializer` > > the `spark.kryo.registrationRequired` controls the following behavior: > > Whether to require registration with Kryo. If set to 'true', Kryo will > throw an exception if an unregistered class is serialized. If set to false > (the default), Kryo will write unregistered class names along with each > object. Writing class names can cause significant performance overhead, so > enabling this option can enforce strictly that a user has not omitted > classes from registration. > > > as described here http://spark.apache.org/docs/latest/configuration.html > > if it's set to `true` you need to manually register classes as described > here: http://spark.apache.org/docs/latest/tuning.html#data-serialization > > > On Wed, Dec 21, 2016 at 8:49 AM, geoHeil <georg.kf.hei...@gmail.com> > wrote: > > To force spark to use kryo serialization I set > spark.kryo.registrationRequired to true. > > Now spark complains that: Class is not registered: > org.apache.spark.sql.types.DataType[] is not registered. > How can I fix this? So far I could not successfully register this class. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-kryo-serialization-register-Datatype-tp28243.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
Re: Spark kryo serialization register Datatype[]
to enable kryo serializer you just need to pass `spark.serializer=org.apache.spark.serializer.KryoSerializer` the `spark.kryo.registrationRequired` controls the following behavior: Whether to require registration with Kryo. If set to 'true', Kryo will > throw an exception if an unregistered class is serialized. If set to false > (the default), Kryo will write unregistered class names along with each > object. Writing class names can cause significant performance overhead, so > enabling this option can enforce strictly that a user has not omitted > classes from registration. as described here http://spark.apache.org/docs/latest/configuration.html if it's set to `true` you need to manually register classes as described here: http://spark.apache.org/docs/latest/tuning.html#data-serialization On Wed, Dec 21, 2016 at 8:49 AM, geoHeil <georg.kf.hei...@gmail.com> wrote: > To force spark to use kryo serialization I set > spark.kryo.registrationRequired to true. > > Now spark complains that: Class is not registered: > org.apache.spark.sql.types.DataType[] is not registered. > How can I fix this? So far I could not successfully register this class. > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Spark-kryo-serialization-register- > Datatype-tp28243.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Spark kryo serialization register Datatype[]
To force spark to use kryo serialization I set spark.kryo.registrationRequired to true. Now spark complains that: Class is not registered: org.apache.spark.sql.types.DataType[] is not registered. How can I fix this? So far I could not successfully register this class. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-kryo-serialization-register-Datatype-tp28243.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Kryo Exception: NegativeArraySizeException
Hi, I'm trying to broadcast a map of 2.6GB but I'm getting a weird Kryo exception. I tried to set -XX:hashCode=0 in executor and driver, following this copmment: https://github.com/broadinstitute/gatk/issues/1524#issuecomment-189368808 But it didn't change anything. Are you aware of this problem? Is there a workaround? Thank for yuor comments, Pedro. Map info: INFO 2016-11-24 15:29:34,230 [main] (Logging.scala:54) - Block broadcast_3 stored as values in memory (estimated size 2.6 GB, free 5.7 GB) Error Trace: ERROR ApplicationMaster: User class threw exception: com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException Serialization trace: ... com.esotericsoftware.kryo.KryoException: java.lang.NegativeArraySizeException Serialization trace: ... at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:195) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307) $blockifyObject$2.apply(TorrentBroadcast.scala:236) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107) at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:646) at com.personal.sparkJob.main(sparkJob..java:81) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627) Caused by: java.lang.NegativeArraySizeException at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:447) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:245) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:246) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:41) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:658) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:623) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ... 22 more
Sporadic ClassNotFoundException with Kryo
Regardless of the different ways we have tried deploying a jar together with Spark, when running a Spark Streaming job with Kryo as serializer on top of Mesos, we sporadically get the following error (I have truncated a bit): /16/11/18 08:39:10 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:129) at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:274) ... at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:125) at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1265) at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1261) ... Caused by: java.lang.ClassNotFoundException: cics.udr.compound_ran_udr at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424)/ where "cics.udr.compound_ran_udr" is a class provided by us in a jar. We know that the jar containing "cics.udr.compound_ran_udr" is being deployed and works because it is listed in the "Environment" tab in the GUI, and calculations using this class succeed. We have tried the following methods of deploying the jar containing the class * Through --jars in spark-submit * Through SparkConf.setJar * Through spark.driver.extraClassPath and spark.executor.extraClassPath * By having it as the main jar used by spark-submit with no luck. The logs (see attached) recognize that the jar is being added to the classloader. We have tried registering the class using * SparkConf.registerKryoClasses. * spark.kryo.classesToRegister with no luck. We are running on Mesos and the jar has been deployed on every machine on the local file system in the same location. I would be very grateful for any help or ideas :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sporadic-ClassNotFoundException-with-Kryo-tp28104.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
BiMap BroadCast Variable - Kryo Serialization Issue
Hi, I am getting Nullpointer exception due to Kryo Serialization issue, while trying to read a BiMap broadcast variable. Attached is the code snippets. Pointers shared here didn't help - link1 <http://stackoverflow.com/questions/33156095/spark-serialization-issue-with-hashmap>, link2 <http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist>. Spark version used is 1.6.x, but this was working with 1.3.x version. Any help in this regard is much appreciated. Exception: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException App > Serialization trace: App > value (com.demo.BiMapWrapper) App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1238) App > at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) App > at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) App > at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) App > at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) App > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) App > at com.manthan.aaas.algo.associationmining.impl.Test.lambda$execute$6abf5fd0$1(Test.java:39) App > at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015) App > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) App > at scala.collection.Iterator$class.foreach(Iterator.scala:727) App > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) App > at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) App > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) App > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) App > at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) App > at scala.collection.AbstractIterator.to(Iterator.scala:1157) App > at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) App > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) App > at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) App > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) App > at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) App > at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) App > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978) App > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978) App > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) App > at org.apache.spark.scheduler.Task.run(Task.scala:89) App > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) App > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) App > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) App > at java.lang.Thread.run(Thread.java:745) App > Caused by: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException App > Serialization trace: App > value (com.demo.BiMapWrapper) App > at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) App > at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) App > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) App > at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228) App > at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217) App > at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178) App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1231) App > ... 29 more App > Caused by: java.lang.NullPointerException App > at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180) App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:230) App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:218) App > at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) App > at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) App > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) App > at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) App > ... 35 more App > App > 16/11/02 18:39:01 dispatcher-event-loop-2 INFO TaskSetManager: Starting task 17.1 in stage 1.0 (TID 19, ip-10-0-1-237.ec2.internal, partition 17,PROCESS_LOCAL, 2076 bytes) App > 16/11/02 18:39:01 task-result-getter-3 INFO TaskSetManager: Lost task 17.1 in stage
Kryo on Zeppelin
Hi All, I am running some spark scala code on zeppelin on CDH 5.5.1 (Spark version 1.5.0). I customized the Spark interpreter to use org.apache.spark. serializer.KryoSerializer as spark.serializer. And in the dependency I added Kyro-3.0.3 as following: com.esotericsoftware:kryo:3.0.3 When I wrote the scala notebook and run the program, I got the following errors. But If I compiled these code as jars, and use spark-submit to run it on the cluster, it worked well without errors. WARN [2016-10-10 23:43:40,801] ({task-result-getter-1} Logging.scala[logWarning]:71) - Lost task 0.0 in stage 3.0 (TID 9, svr-A3-A-U20): java.io.EOFException at org.apache.spark.serializer.KryoDeserializationStream. readObject(KryoSerializer.scala:196) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject( TorrentBroadcast.scala:217) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$ readBroadcastBlock$1.apply(TorrentBroadcast.scala:178) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock( TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadcast._value$ lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value( TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue( TorrentBroadcast.scala:88) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) There were also some errors when I run the Zeppelin Tutorial: Caused by: java.io.IOException: java.lang.NullPointerException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) at org.apache.spark.rdd.ParallelCollectionPartition.readObject( ParallelCollectionRDD.scala:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject( ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData( ObjectInputStream.java:1900) at java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream. java:1351) at java.io.ObjectInputStream.defaultReadFields( ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData( ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream. java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream. readObject(JavaSerializer.scala:72) at org.apache.spark.serializer.JavaSerializerInstance. deserialize(JavaSerializer.scala:98) at org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:194) ... 3 more Caused by: java.lang.NullPointerException at com.twitter.chill.WrappedArraySerializer.read( WrappedArraySerializer.scala:38) at com.twitter.chill.WrappedArraySerializer.read( WrappedArraySerializer.scala:23) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream. readObject(KryoSerializer.scala:192) at org.apache.spark.rdd.ParallelCollectionPartition$$ anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply( ParallelCollectionRDD.scala:80) at org.apache.spark.rdd.ParallelCollectionPartition$$ anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply( ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.deserializeViaNestedStream( Utils.scala:142) at org.apache.spark.rdd.ParallelCollectionPartition$$ anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:80) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) Is there anyone knowing why it happended? Thanks in advance, Fei
Re: mutable.LinkedHashMap kryo serialization issues
Hi, I apologize, I spoke too soon. Those transient member variables may not be the issue. To clarify my test case I am creating a LinkedHashMap with two elements in a map expression on an RDD. Note that the LinkedHashMaps are being created on the worker JVMs (not the driver JVM) and THEN collected to the driver JVM. I am NOT creating LinkedHashMaps on the driver and then parallelizing them (sending them to worker JVMs). As Renato said spark requires us to register classes that aren't yet in Chill. As far as I know there are three ways to register and it's through api calls on sparkConf. 1. sparkConf().registerKryoClasses(Array(classOf[...], clasOf[...])) * This is the method of registering classes as described in the Tuning page: http://spark.apache.org/docs/latest/tuning.html#data-serialization 2. sparkConf().set("spark.kryo.classesToRegister", "cName1, cName2") 3. sparkConf().set("spark.kryo.registrator", "registrator1, registrator2") In the first two methods, which set the classes to register in Kryo, what I get are empty mutable.LinkedHashMaps after calling collect on the RDD. To my best understanding this should not happen (none of the other collection classes I have used have this problem). For the third method I created a registrator for mutable.LinkedHashMap which can be found here : https://gist.github.com/rahulpalamuttam/9f3bfa39a160efa80844d3a7a7bd87cd I set the registrator like so : sparkConf().set("spark.kryo.registrator", "org.dia.core.MutableLinkedHashMapRegistrator"). Now, when I do the same test, I get an Array of LinkedHashMaps. Each LinkedHashMap contains the entries I populated it with in the map task. Why do the first two methods result in improper serialization of mutable.LinkedHashMap? Should I file a JIRA for it? Much credit should be given to Martin Grotzke from EsotericSoftware/kryo who helped me tremendously. Best, Rahul Palamuttam On Fri, Aug 26, 2016 at 10:16 AM, Rahul Palamuttam <rahulpala...@gmail.com> wrote: > Thanks Renato. > > I forgot to reply all last time. I apologize for the rather confusing > example. > All that the snipet code did was > 1. Make an RDD of LinkedHashMaps with size 2 > 2. On the worker side get the sizes of the HashMaps (via a map(hash => > hash.size)) > 3. On the driver call collect on the RDD[Ints] which is the RDD of hashmap > sizes giving you an Array[Ints] > 4. On the driver call collect on the RDD[LinkedHashMap] giving you an > Array[LinkedHashMap] > 5. Check the size of a hashmap in Array[LinkedHashMap] with any size value > in Array[Ints] (they're all going to be the same size). > 6. The sizes differ because the elements of the LinkedHashMap were never > copied over > > Anyway I think I've tracked down the issue and it doesn't seem to be a > spark or kryo issue. > > For those it concerns LinkedHashMap has this serialization issue because > it has transient members for firstEntry and lastEntry. > Take a look here : https://github.com/scala/scala/blob/v2.11.8/src/ > library/scala/collection/mutable/LinkedHashMap.scala#L62 > > Those attributes are not going to be serialized. > Furthermore, the iterator on LinkedHashMap depends on the firstEntry > variable > Since that member is not serialized it is null. > The iterator requires the firstEntry variable to walk the LinkedHashMap > https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/ > mutable/LinkedHashMap.scala#L94-L100 > > I wonder why these two variables were made transient. > > Best, > Rahul Palamuttam > > > On Thu, Aug 25, 2016 at 11:13 PM, Renato Marroquín Mogrovejo < > renatoj.marroq...@gmail.com> wrote: > >> Hi Rahul, >> >> You have probably already figured this one out, but anyway... >> You need to register the classes that you'll be using with Kryo because >> it does not support all Serializable types and requires you to register the >> classes you’ll use in the program in advance. So when you don't register >> the class, Kryo doesn't know how to serialize/deserialize it. >> >> >> Best, >> >> Renato M. >> >> 2016-08-22 17:12 GMT+02:00 Rahul Palamuttam <rahulpala...@gmail.com>: >> >>> Hi, >>> >>> Just sending this again to see if others have had this issue. >>> >>> I recently switched to using kryo serialization and I've been running >>> into errors >>> with the mutable.LinkedHashMap class. >>> >>> If I don't register the mutable.LinkedHashMap class then I get an >>> ArrayStoreException seen below. >>> If I do register the class, then when the LinkedHashMap is collected on >>> the driver, it does not contain any elements. >>> >>> H
Re: mutable.LinkedHashMap kryo serialization issues
Thanks Renato. I forgot to reply all last time. I apologize for the rather confusing example. All that the snipet code did was 1. Make an RDD of LinkedHashMaps with size 2 2. On the worker side get the sizes of the HashMaps (via a map(hash => hash.size)) 3. On the driver call collect on the RDD[Ints] which is the RDD of hashmap sizes giving you an Array[Ints] 4. On the driver call collect on the RDD[LinkedHashMap] giving you an Array[LinkedHashMap] 5. Check the size of a hashmap in Array[LinkedHashMap] with any size value in Array[Ints] (they're all going to be the same size). 6. The sizes differ because the elements of the LinkedHashMap were never copied over Anyway I think I've tracked down the issue and it doesn't seem to be a spark or kryo issue. For those it concerns LinkedHashMap has this serialization issue because it has transient members for firstEntry and lastEntry. Take a look here : https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/mutable/LinkedHashMap.scala#L62 Those attributes are not going to be serialized. Furthermore, the iterator on LinkedHashMap depends on the firstEntry variable Since that member is not serialized it is null. The iterator requires the firstEntry variable to walk the LinkedHashMap https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/mutable/LinkedHashMap.scala#L94-L100 I wonder why these two variables were made transient. Best, Rahul Palamuttam On Thu, Aug 25, 2016 at 11:13 PM, Renato Marroquín Mogrovejo < renatoj.marroq...@gmail.com> wrote: > Hi Rahul, > > You have probably already figured this one out, but anyway... > You need to register the classes that you'll be using with Kryo because it > does not support all Serializable types and requires you to register the > classes you’ll use in the program in advance. So when you don't register > the class, Kryo doesn't know how to serialize/deserialize it. > > > Best, > > Renato M. > > 2016-08-22 17:12 GMT+02:00 Rahul Palamuttam <rahulpala...@gmail.com>: > >> Hi, >> >> Just sending this again to see if others have had this issue. >> >> I recently switched to using kryo serialization and I've been running >> into errors >> with the mutable.LinkedHashMap class. >> >> If I don't register the mutable.LinkedHashMap class then I get an >> ArrayStoreException seen below. >> If I do register the class, then when the LinkedHashMap is collected on >> the driver, it does not contain any elements. >> >> Here is the snippet of code I used : >> >> val sc = new SparkContext(new SparkConf() >> .setMaster("local[*]") >> .setAppName("Sample") >> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") >> .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, >> String]]))) >> >> val collect = sc.parallelize(0 to 10) >> .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", >> "bonjour"), ("good", "bueno"))) >> >> val mapSideSizes = collect.map(p => p.size).collect()(0) >> val driverSideSizes = collect.collect()(0).size >> >> println("The sizes before collect : " + mapSideSizes) >> println("The sizes after collect : " + driverSideSizes) >> >> >> ** The following only occurs if I did not register the >> mutable.LinkedHashMap class ** >> 16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task >> result >> java.lang.ArrayStoreException: scala.collection.mutable.HashMap >> at com.esotericsoftware.kryo.serializers.DefaultArraySerializer >> s$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >> at com.esotericsoftware.kryo.serializers.DefaultArraySerializer >> s$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >> at org.apache.spark.serializer.KryoSerializerInstance.deseriali >> ze(KryoSerializer.scala:311) >> at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97) >> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun >> $run$1.apply$mcV$sp(TaskResultGetter.scala:60) >> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun >> $run$1.apply(TaskResultGetter.scala:51) >> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun >> $run$1.apply(TaskResultGetter.scala:51) >> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) >> at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(Task >> ResultGetter.scala:50) >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1142) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> I hope this is a known issue and/or I'm missing something important in my >> setup. >> Appreciate any help or advice! >> >> Best, >> >> Rahul Palamuttam >> > >
Re: mutable.LinkedHashMap kryo serialization issues
Hi Rahul, You have probably already figured this one out, but anyway... You need to register the classes that you'll be using with Kryo because it does not support all Serializable types and requires you to register the classes you’ll use in the program in advance. So when you don't register the class, Kryo doesn't know how to serialize/deserialize it. Best, Renato M. 2016-08-22 17:12 GMT+02:00 Rahul Palamuttam <rahulpala...@gmail.com>: > Hi, > > Just sending this again to see if others have had this issue. > > I recently switched to using kryo serialization and I've been running into > errors > with the mutable.LinkedHashMap class. > > If I don't register the mutable.LinkedHashMap class then I get an > ArrayStoreException seen below. > If I do register the class, then when the LinkedHashMap is collected on > the driver, it does not contain any elements. > > Here is the snippet of code I used : > > val sc = new SparkContext(new SparkConf() > .setMaster("local[*]") > .setAppName("Sample") > .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") > .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, String]]))) > > val collect = sc.parallelize(0 to 10) > .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", > "bonjour"), ("good", "bueno"))) > > val mapSideSizes = collect.map(p => p.size).collect()(0) > val driverSideSizes = collect.collect()(0).size > > println("The sizes before collect : " + mapSideSizes) > println("The sizes after collect : " + driverSideSizes) > > > ** The following only occurs if I did not register the > mutable.LinkedHashMap class ** > 16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task > result > java.lang.ArrayStoreException: scala.collection.mutable.HashMap > at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ > ObjectArraySerializer.read(DefaultArraySerializers.java:338) > at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ > ObjectArraySerializer.read(DefaultArraySerializers.java:293) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at org.apache.spark.serializer.KryoSerializerInstance. > deserialize(KryoSerializer.scala:311) > at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97) > at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$ > anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60) > at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$ > anonfun$run$1.apply(TaskResultGetter.scala:51) > at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$ > anonfun$run$1.apply(TaskResultGetter.scala:51) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) > at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run( > TaskResultGetter.scala:50) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > I hope this is a known issue and/or I'm missing something important in my > setup. > Appreciate any help or advice! > > Best, > > Rahul Palamuttam >
Re:Do we still need to use Kryo serializer in Spark 1.6.2 ?
The way to use Kryo serializer is similar as Scala, like below, the only different is lack of convenient method "conf.registerKryoClasses", but it should be easy to make one by yourself conf=SparkConf() conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.classesToRegister", "com.example.YourClassA,com.example.YourClassB") At 2016-08-23 02:00:41, "Eric Ho" <e...@analyticsmd.com> wrote: I heard that Kryo will get phased out at some point but not sure which Spark release. I'm using PySpark, does anyone has any docs on how to call / use Kryo Serializer in PySpark ? Thanks. -- -eric ho
Re: Do we still need to use Kryo serializer in Spark 1.6.2 ?
Hi, I've not heard this. And moreover I see Kryo supported in Encoders (SerDes) in Spark 2.0. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala#L151 Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Mon, Aug 22, 2016 at 8:00 PM, Eric Ho <e...@analyticsmd.com> wrote: > I heard that Kryo will get phased out at some point but not sure which Spark > release. > I'm using PySpark, does anyone has any docs on how to call / use Kryo > Serializer in PySpark ? > > Thanks. > > -- > > -eric ho > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Do we still need to use Kryo serializer in Spark 1.6.2 ?
I heard that Kryo will get phased out at some point but not sure which Spark release. I'm using PySpark, does anyone has any docs on how to call / use Kryo Serializer in PySpark ? Thanks. -- -eric ho
mutable.LinkedHashMap kryo serialization issues
Hi, Just sending this again to see if others have had this issue. I recently switched to using kryo serialization and I've been running into errors with the mutable.LinkedHashMap class. If I don't register the mutable.LinkedHashMap class then I get an ArrayStoreException seen below. If I do register the class, then when the LinkedHashMap is collected on the driver, it does not contain any elements. Here is the snippet of code I used : val sc = new SparkContext(new SparkConf() .setMaster("local[*]") .setAppName("Sample") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, String]]))) val collect = sc.parallelize(0 to 10) .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", "bonjour"), ("good", "bueno"))) val mapSideSizes = collect.map(p => p.size).collect()(0) val driverSideSizes = collect.collect()(0).size println("The sizes before collect : " + mapSideSizes) println("The sizes after collect : " + driverSideSizes) ** The following only occurs if I did not register the mutable.LinkedHashMap class ** 16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task result java.lang.ArrayStoreException: scala.collection.mutable.HashMap at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I hope this is a known issue and/or I'm missing something important in my setup. Appreciate any help or advice! Best, Rahul Palamuttam
mutable.LinkedHashMap kryo serialization issues
Hi, I recently switched to using kryo serialization and I've been running into errors with the mutable.LinkedHashMap class. If I don't register the mutable.LinkedHashMap class then I get an ArrayStoreException seen below. If I do register the class, then when the LinkedHashMap is collected on the driver, it does not contain any elements. Here is the snippet of code I used : val sc = new SparkContext(new SparkConf() .setMaster("local[*]") .setAppName("Sample") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, String]]))) val collect = sc.parallelize(0 to 10) .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", "bonjour"), ("good", "bueno"))) val mapSideSizes = collect.map(p => p.size).collect()(0) val driverSideSizes = collect.collect()(0).size println("The sizes before collect : " + mapSideSizes) println("The sizes after collect : " + driverSideSizes) ** The following only occurs if I did not register the mutable.LinkedHashMap class ** 16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task result java.lang.ArrayStoreException: scala.collection.mutable.HashMap at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I hope this is a known issue and/or I'm missing something important in my setup. Appreciate any help or advice! Best, Rahul Palamuttam
Spark, Kryo Serialization Issue with ProtoBuf field
Hi, I am seeing an error when running my spark job relating to Serialization of a protobuf field when transforming an RDD. com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: otherAuthors_ (com.thomsonreuters.kraken.medusa.dbor.proto.Book$DBBooks) The error seems to be created at this point: val booksPerTier: Iterable[(TimeTier, RDD[DBBooks])] = allTiers.map { tier => (tier, books.filter(b => isInTier(endOfInterval, tier, b) && !isBookPublished(o)).mapPartitions( it => it.map{ord => (ord.getAuthor, ord.getPublisherName, getGenre(ord.getSourceCountry))})) } val averagesPerAuthor = booksPerTier.flatMap { case (tier, opt) => opt.map(o => (tier, o._1, PublisherCompanyComparison, o._3)).countByValue() } val averagesPerPublisher = booksPerTier.flatMap { case (tier, opt) => opt.map(o => (tier, o._1, PublisherComparison(o._2), o._3)).countByValue() } The field is a list specified in the protobuf as the below: otherAuthors_ = java.util.Collections.emptyList() As you can see the code is not actually utilising that field from the Book Protobuf, although it still is being transmitted over the network. Has anyone got any advice on this? Thanks, K
Re: Kryo ClassCastException during Serialization/deserialization in Spark Streaming
sampleMap is populated from inside a method that is getting called from updateStateByKey On Thu, Jun 23, 2016 at 1:13 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Can you illustrate how sampleMap is populated ? > > Thanks > > On Thu, Jun 23, 2016 at 12:34 PM, SRK <swethakasire...@gmail.com> wrote: > >> Hi, >> >> I keep getting the following error in my Spark Streaming every now and >> then >> after the job runs for say around 10 hours. I have those 2 classes >> registered in kryo as shown below. sampleMap is a field in SampleSession >> as shown below. Any suggestion as to how to avoid this would be of great >> help!! >> >> public class SampleSession implements Serializable, Cloneable{ >>private Map<String, Sample> sampleMap; >> } >> >> sparkConf.registerKryoClasses(Array( classOf[SampleSession], >> classOf[Sample])) >> >> >> >> com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException: >> com.test.Sample cannot be cast to java.lang.String >> Serialization trace: >> sampleMap (com.test.SampleSession) >> at >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) >> at >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> at >> com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:96) >> at >> com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:93) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> at >> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) >> at >> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> at >> >> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) >> at >> >> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) >> at >> >> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) >> at >> >> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) >> at >> org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) >> at >> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) >> at >> org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) >> at >> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) >> at >> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: java.lang.ClassCastException: com.test.Sample cannot be cast to >> java.lang.String >> at
Re: Kryo ClassCastException during Serialization/deserialization in Spark Streaming
Can you illustrate how sampleMap is populated ? Thanks On Thu, Jun 23, 2016 at 12:34 PM, SRK <swethakasire...@gmail.com> wrote: > Hi, > > I keep getting the following error in my Spark Streaming every now and then > after the job runs for say around 10 hours. I have those 2 classes > registered in kryo as shown below. sampleMap is a field in SampleSession > as shown below. Any suggestion as to how to avoid this would be of great > help!! > > public class SampleSession implements Serializable, Cloneable{ >private Map<String, Sample> sampleMap; > } > > sparkConf.registerKryoClasses(Array( classOf[SampleSession], > classOf[Sample])) > > > > com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException: > com.test.Sample cannot be cast to java.lang.String > Serialization trace: > sampleMap (com.test.SampleSession) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) > at > > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:96) > at > com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:93) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) > at > com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) > at > > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) > at > > org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) > at > > org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) > at > org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) > at > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) > at > org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) > at > org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) > at > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.ClassCastException: com.test.Sample cannot be cast to > java.lang.String > at > > com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) > at > > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:82) > at > > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) > at > > com.esotericsoftware.kryo.serializers.
Kryo ClassCastException during Serialization/deserialization in Spark Streaming
Hi, I keep getting the following error in my Spark Streaming every now and then after the job runs for say around 10 hours. I have those 2 classes registered in kryo as shown below. sampleMap is a field in SampleSession as shown below. Any suggestion as to how to avoid this would be of great help!! public class SampleSession implements Serializable, Cloneable{ private Map<String, Sample> sampleMap; } sparkConf.registerKryoClasses(Array( classOf[SampleSession], classOf[Sample])) com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException: com.test.Sample cannot be cast to java.lang.String Serialization trace: sampleMap (com.test.SampleSession) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:96) at com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:93) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199) at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793) at org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: com.test.Sample cannot be cast to java.lang.String at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:82) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) ... 37 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-ClassCastException-during-Serialization-deserialization-in-Spark-Streaming-tp27219.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dataset kryo encoder fails on Collections$UnmodifiableCollection
See SPARK-15489 <https://issues.apache.org/jira/browse/SPARK-15489> I'll try to figure this one out as well, any leads ? "immediate suspects" ? Thanks, Amit On Mon, May 23, 2016 at 10:27 PM Michael Armbrust <mich...@databricks.com> wrote: > Can you open a JIRA? > > On Sun, May 22, 2016 at 2:50 PM, Amit Sela <amitsel...@gmail.com> wrote: > >> I've been using Encoders with Kryo to support encoding of generically >> typed Java classes, mostly with success, in the following manner: >> >> public static Encoder encoder() { >> return Encoders.kryo((Class) Object.class); >> } >> >> But at some point I got a decoding exception "Caused by: >> java.lang.UnsupportedOperationException >> at java.util.Collections$UnmodifiableCollection.add..." >> >> This seems to be because of Guava's `ImmutableList`. >> >> I tried registering `UnmodifiableCollectionsSerializer` and ` >> ImmutableListSerializer` from: https://github.com/magro/kryo-serializers >> but it didn't help. >> >> Ideas ? >> >> Thanks, >> Amit >> > >
Re: Dataset kryo encoder fails on Collections$UnmodifiableCollection
Can you open a JIRA? On Sun, May 22, 2016 at 2:50 PM, Amit Sela <amitsel...@gmail.com> wrote: > I've been using Encoders with Kryo to support encoding of generically > typed Java classes, mostly with success, in the following manner: > > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > > But at some point I got a decoding exception "Caused by: > java.lang.UnsupportedOperationException > at java.util.Collections$UnmodifiableCollection.add..." > > This seems to be because of Guava's `ImmutableList`. > > I tried registering `UnmodifiableCollectionsSerializer` and ` > ImmutableListSerializer` from: https://github.com/magro/kryo-serializers > but it didn't help. > > Ideas ? > > Thanks, > Amit >
Re: kryo
This should be related: https://github.com/JodaOrg/joda-time/issues/307 Do you have more of the stack trace ? Cheers On Thu, May 12, 2016 at 12:39 PM, Younes Naguib < younes.nag...@tritondigital.com> wrote: > Thanks, > > I used that. > > Now I seem to have the following problem: > > java.lang.NullPointerException > > at > org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143) > > at > org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103) > > at > org.joda.time.DateTimeZone.convertUTCToLocal(DateTimeZone.java:925) > > > > > > Any ideas? > > > > Thanks > > > > > > *From:* Ted Yu [mailto:yuzhih...@gmail.com] > *Sent:* May-11-16 5:32 PM > *To:* Younes Naguib > *Cc:* user@spark.apache.org > *Subject:* Re: kryo > > > > Have you seen this thread ? > > > > > http://search-hadoop.com/m/q3RTtpO0qI3cp06/JodaDateTimeSerializer+spark=Re+NPE+when+using+Joda+DateTime > > > > On Wed, May 11, 2016 at 2:18 PM, Younes Naguib < > younes.nag...@tritondigital.com> wrote: > > Hi all, > > I'm trying to get to use spark.serializer. > I set it in the spark-default.conf, but I statred getting issues with > datetimes. > > As I understand, I need to disable it. > Anyways to keep using kryo? > > It's seems I can use JodaDateTimeSerializer for datetimes, just not sure > how to set it, and register it in the spark-default conf. > > Thanks, > > *Younes Naguib* <younes.nag...@streamtheworld.com> > > >
RE: kryo
Thanks, I used that. Now I seem to have the following problem: java.lang.NullPointerException at org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143) at org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103) at org.joda.time.DateTimeZone.convertUTCToLocal(DateTimeZone.java:925) Any ideas? Thanks From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: May-11-16 5:32 PM To: Younes Naguib Cc: user@spark.apache.org Subject: Re: kryo Have you seen this thread ? http://search-hadoop.com/m/q3RTtpO0qI3cp06/JodaDateTimeSerializer+spark=Re+NPE+when+using+Joda+DateTime On Wed, May 11, 2016 at 2:18 PM, Younes Naguib <younes.nag...@tritondigital.com<mailto:younes.nag...@tritondigital.com>> wrote: Hi all, I'm trying to get to use spark.serializer. I set it in the spark-default.conf, but I statred getting issues with datetimes. As I understand, I need to disable it. Anyways to keep using kryo? It's seems I can use JodaDateTimeSerializer for datetimes, just not sure how to set it, and register it in the spark-default conf. Thanks, Younes Naguib <mailto:younes.nag...@streamtheworld.com>
Re: kryo
Have you seen this thread ? http://search-hadoop.com/m/q3RTtpO0qI3cp06/JodaDateTimeSerializer+spark=Re+NPE+when+using+Joda+DateTime On Wed, May 11, 2016 at 2:18 PM, Younes Naguib < younes.nag...@tritondigital.com> wrote: > Hi all, > > I'm trying to get to use spark.serializer. > I set it in the spark-default.conf, but I statred getting issues with > datetimes. > > As I understand, I need to disable it. > Anyways to keep using kryo? > > It's seems I can use JodaDateTimeSerializer for datetimes, just not sure > how to set it, and register it in the spark-default conf. > > Thanks, > > *Younes Naguib* <younes.nag...@streamtheworld.com> >
kryo
Hi all, I'm trying to get to use spark.serializer. I set it in the spark-default.conf, but I statred getting issues with datetimes. As I understand, I need to disable it. Anyways to keep using kryo? It's seems I can use JodaDateTimeSerializer for datetimes, just not sure how to set it, and register it in the spark-default conf. Thanks, Younes Naguib <mailto:younes.nag...@streamtheworld.com>
Re: How to verify if spark is using kryo serializer for shuffle
Yes my mistake. I am using Spark 1.5.2 not 2.x. I looked at running spark driver jvm process on linux. Looks like my settings are not being applied to driver. We use oozie spark action to launch spark. I will have to investigate more on that. hopefully spark is or have replaced memory killer Java serializer to better streaming serializer. Thanks On Sun, May 8, 2016 at 9:33 AM, Ted Yu <yuzhih...@gmail.com> wrote: > See the following: > [SPARK-7997][CORE] Remove Akka from Spark Core and Streaming > > I guess you meant you are using Spark 1.5.1 > > For the time being, consider increasing spark.driver.memory > > Cheers > > On Sun, May 8, 2016 at 9:14 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > >> Yes, I am using yarn client mode hence I specified am settings too. >> What you mean akka is moved out of picture? I am using spark 2.5.1 >> >> Sent from my iPhone >> >> On May 8, 2016, at 6:39 AM, Ted Yu <yuzhih...@gmail.com> wrote: >> >> Are you using YARN client mode ? >> >> See >> https://spark.apache.org/docs/latest/running-on-yarn.html >> >> In cluster mode, spark.yarn.am.memory is not effective. >> >> For Spark 2.0, akka is moved out of the picture. >> FYI >> >> On Sat, May 7, 2016 at 8:24 PM, Nirav Patel <npa...@xactlycorp.com> >> wrote: >> >>> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one. >>> All of them have 6474 tasks. 5th task is a count operations and it also >>> performs aggregateByKey as a part of it lazy evaluation. >>> I am setting: >>> spark.driver.memory=10G, spark.yarn.am.memory=2G and >>> spark.driver.maxResultSize=9G >>> >>> >>> On a side note, could it be something to do with java serialization >>> library, ByteArrayOutputStream using byte array? Can it be replaced by >>> some better serializing library? >>> >>> https://bugs.openjdk.java.net/browse/JDK-8055949 >>> https://bugs.openjdk.java.net/browse/JDK-8136527 >>> >>> Thanks >>> >>> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com> >>> wrote: >>> >>>> Driver maintains the complete metadata of application ( scheduling of >>>> executor and maintaining the messaging to control the execution ) >>>> This code seems to be failing in that code path only. With that said >>>> there is Jvm overhead based on num of executors , stages and tasks in your >>>> app. Do you know your driver heap size and application structure ( num of >>>> stages and tasks ) >>>> >>>> Ashish >>>> >>>> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote: >>>> >>>>> Right but this logs from spark driver and spark driver seems to use >>>>> Akka. >>>>> >>>>> ERROR [sparkDriver-akka.actor.default-dispatcher-17] >>>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread >>>>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down >>>>> ActorSystem [sparkDriver] >>>>> >>>>> I saw following logs before above happened. >>>>> >>>>> 2016-05-06 09:49:17,813 INFO >>>>> [sparkDriver-akka.actor.default-dispatcher-17] >>>>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output >>>>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503 >>>>> >>>>> >>>>> As far as I know driver is just driving shuffle operation but not >>>>> actually doing anything within its own system that will cause memory >>>>> issue. >>>>> Can you explain in what circumstances I could see this error in driver >>>>> logs? I don't do any collect or any other driver operation that would >>>>> cause >>>>> this. It fails when doing aggregateByKey operation but that should happen >>>>> in executor JVM NOT in driver JVM. >>>>> >>>>> >>>>> Thanks >>>>> >>>>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> >>>>>> bq. at akka.serialization.JavaSerializer.toBinary( >>>>>> Serializer.scala:129) >>>>>> >>>>>> It was Akka which uses JavaSerializer >>>>>> >>>>>> Cheers >>>>>> >>>>>> On Sat, May 7
Re: How to verify if spark is using kryo serializer for shuffle
See the following: [SPARK-7997][CORE] Remove Akka from Spark Core and Streaming I guess you meant you are using Spark 1.5.1 For the time being, consider increasing spark.driver.memory Cheers On Sun, May 8, 2016 at 9:14 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > Yes, I am using yarn client mode hence I specified am settings too. > What you mean akka is moved out of picture? I am using spark 2.5.1 > > Sent from my iPhone > > On May 8, 2016, at 6:39 AM, Ted Yu <yuzhih...@gmail.com> wrote: > > Are you using YARN client mode ? > > See > https://spark.apache.org/docs/latest/running-on-yarn.html > > In cluster mode, spark.yarn.am.memory is not effective. > > For Spark 2.0, akka is moved out of the picture. > FYI > > On Sat, May 7, 2016 at 8:24 PM, Nirav Patel <npa...@xactlycorp.com> wrote: > >> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one. >> All of them have 6474 tasks. 5th task is a count operations and it also >> performs aggregateByKey as a part of it lazy evaluation. >> I am setting: >> spark.driver.memory=10G, spark.yarn.am.memory=2G and >> spark.driver.maxResultSize=9G >> >> >> On a side note, could it be something to do with java serialization >> library, ByteArrayOutputStream using byte array? Can it be replaced by >> some better serializing library? >> >> https://bugs.openjdk.java.net/browse/JDK-8055949 >> https://bugs.openjdk.java.net/browse/JDK-8136527 >> >> Thanks >> >> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com> >> wrote: >> >>> Driver maintains the complete metadata of application ( scheduling of >>> executor and maintaining the messaging to control the execution ) >>> This code seems to be failing in that code path only. With that said >>> there is Jvm overhead based on num of executors , stages and tasks in your >>> app. Do you know your driver heap size and application structure ( num of >>> stages and tasks ) >>> >>> Ashish >>> >>> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote: >>> >>>> Right but this logs from spark driver and spark driver seems to use >>>> Akka. >>>> >>>> ERROR [sparkDriver-akka.actor.default-dispatcher-17] >>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread >>>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down >>>> ActorSystem [sparkDriver] >>>> >>>> I saw following logs before above happened. >>>> >>>> 2016-05-06 09:49:17,813 INFO >>>> [sparkDriver-akka.actor.default-dispatcher-17] >>>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output >>>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503 >>>> >>>> >>>> As far as I know driver is just driving shuffle operation but not >>>> actually doing anything within its own system that will cause memory issue. >>>> Can you explain in what circumstances I could see this error in driver >>>> logs? I don't do any collect or any other driver operation that would cause >>>> this. It fails when doing aggregateByKey operation but that should happen >>>> in executor JVM NOT in driver JVM. >>>> >>>> >>>> Thanks >>>> >>>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> bq. at akka.serialization.JavaSerializer.toBinary( >>>>> Serializer.scala:129) >>>>> >>>>> It was Akka which uses JavaSerializer >>>>> >>>>> Cheers >>>>> >>>>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I thought I was using kryo serializer for shuffle. I could verify it >>>>>> from spark UI - Environment tab that >>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer >>>>>> spark.kryo.registrator >>>>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator >>>>>> >>>>>> >>>>>> But when I see following error in Driver logs it looks like spark is >>>>>> using JavaSerializer >>>>>> >>>>>> 2016-05-06 09:49:26,490 ERROR >>>>>> [sparkDriver-akka.actor.default-dispatcher-17] >
Re: How to verify if spark is using kryo serializer for shuffle
Yes, I am using yarn client mode hence I specified am settings too. What you mean akka is moved out of picture? I am using spark 2.5.1 Sent from my iPhone > On May 8, 2016, at 6:39 AM, Ted Yu <yuzhih...@gmail.com> wrote: > > Are you using YARN client mode ? > > See > https://spark.apache.org/docs/latest/running-on-yarn.html > > In cluster mode, spark.yarn.am.memory is not effective. > > For Spark 2.0, akka is moved out of the picture. > FYI > >> On Sat, May 7, 2016 at 8:24 PM, Nirav Patel <npa...@xactlycorp.com> wrote: >> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one. All >> of them have 6474 tasks. 5th task is a count operations and it also performs >> aggregateByKey as a part of it lazy evaluation. >> I am setting: >> spark.driver.memory=10G, spark.yarn.am.memory=2G and >> spark.driver.maxResultSize=9G >> >> >> On a side note, could it be something to do with java serialization library, >> ByteArrayOutputStream using byte array? Can it be replaced by some better >> serializing library? >> >> https://bugs.openjdk.java.net/browse/JDK-8055949 >> https://bugs.openjdk.java.net/browse/JDK-8136527 >> >> Thanks >> >>> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com> wrote: >>> Driver maintains the complete metadata of application ( scheduling of >>> executor and maintaining the messaging to control the execution ) >>> This code seems to be failing in that code path only. With that said there >>> is Jvm overhead based on num of executors , stages and tasks in your app. >>> Do you know your driver heap size and application structure ( num of stages >>> and tasks ) >>> >>> Ashish >>> >>>> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote: >>>> Right but this logs from spark driver and spark driver seems to use Akka. >>>> >>>> ERROR [sparkDriver-akka.actor.default-dispatcher-17] >>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread >>>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down >>>> ActorSystem [sparkDriver] >>>> >>>> I saw following logs before above happened. >>>> >>>> 2016-05-06 09:49:17,813 INFO >>>> [sparkDriver-akka.actor.default-dispatcher-17] >>>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output >>>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503 >>>> >>>> >>>> >>>> As far as I know driver is just driving shuffle operation but not actually >>>> doing anything within its own system that will cause memory issue. Can you >>>> explain in what circumstances I could see this error in driver logs? I >>>> don't do any collect or any other driver operation that would cause this. >>>> It fails when doing aggregateByKey operation but that should happen in >>>> executor JVM NOT in driver JVM. >>>> >>>> >>>> >>>> Thanks >>>> >>>> >>>>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> bq. at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) >>>>> >>>>> It was Akka which uses JavaSerializer >>>>> >>>>> Cheers >>>>> >>>>>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com> >>>>>> wrote: >>>>>> Hi, >>>>>> >>>>>> I thought I was using kryo serializer for shuffle. I could verify it >>>>>> from spark UI - Environment tab that >>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer >>>>>> spark.kryo.registrator >>>>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator >>>>>> >>>>>> >>>>>> But when I see following error in Driver logs it looks like spark is >>>>>> using JavaSerializer >>>>>> >>>>>> 2016-05-06 09:49:26,490 ERROR >>>>>> [sparkDriver-akka.actor.default-dispatcher-17] >>>>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread >>>>>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down >>>>>> ActorSystem [sparkDriver] >>>>>> >>>>>
Re: How to verify if spark is using kryo serializer for shuffle
Are you using YARN client mode ? See https://spark.apache.org/docs/latest/running-on-yarn.html In cluster mode, spark.yarn.am.memory is not effective. For Spark 2.0, akka is moved out of the picture. FYI On Sat, May 7, 2016 at 8:24 PM, Nirav Patel <npa...@xactlycorp.com> wrote: > I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one. > All of them have 6474 tasks. 5th task is a count operations and it also > performs aggregateByKey as a part of it lazy evaluation. > I am setting: > spark.driver.memory=10G, spark.yarn.am.memory=2G and > spark.driver.maxResultSize=9G > > > On a side note, could it be something to do with java serialization > library, ByteArrayOutputStream using byte array? Can it be replaced by > some better serializing library? > > https://bugs.openjdk.java.net/browse/JDK-8055949 > https://bugs.openjdk.java.net/browse/JDK-8136527 > > Thanks > > On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com> wrote: > >> Driver maintains the complete metadata of application ( scheduling of >> executor and maintaining the messaging to control the execution ) >> This code seems to be failing in that code path only. With that said >> there is Jvm overhead based on num of executors , stages and tasks in your >> app. Do you know your driver heap size and application structure ( num of >> stages and tasks ) >> >> Ashish >> >> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote: >> >>> Right but this logs from spark driver and spark driver seems to use Akka. >>> >>> ERROR [sparkDriver-akka.actor.default-dispatcher-17] >>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread >>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down >>> ActorSystem [sparkDriver] >>> >>> I saw following logs before above happened. >>> >>> 2016-05-06 09:49:17,813 INFO >>> [sparkDriver-akka.actor.default-dispatcher-17] >>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output >>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503 >>> >>> >>> As far as I know driver is just driving shuffle operation but not >>> actually doing anything within its own system that will cause memory issue. >>> Can you explain in what circumstances I could see this error in driver >>> logs? I don't do any collect or any other driver operation that would cause >>> this. It fails when doing aggregateByKey operation but that should happen >>> in executor JVM NOT in driver JVM. >>> >>> >>> Thanks >>> >>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> bq. at akka.serialization.JavaSerializer.toBinary( >>>> Serializer.scala:129) >>>> >>>> It was Akka which uses JavaSerializer >>>> >>>> Cheers >>>> >>>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I thought I was using kryo serializer for shuffle. I could verify it >>>>> from spark UI - Environment tab that >>>>> spark.serializer org.apache.spark.serializer.KryoSerializer >>>>> spark.kryo.registrator >>>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator >>>>> >>>>> >>>>> But when I see following error in Driver logs it looks like spark is >>>>> using JavaSerializer >>>>> >>>>> 2016-05-06 09:49:26,490 ERROR >>>>> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl: >>>>> Uncaught fatal error from thread >>>>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down >>>>> ActorSystem [sparkDriver] >>>>> >>>>> java.lang.OutOfMemoryError: Java heap space >>>>> >>>>> at java.util.Arrays.copyOf(Arrays.java:2271) >>>>> >>>>> at >>>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) >>>>> >>>>> at >>>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >>>>> >>>>> at >>>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) >>>>> >>>>> at >>>>> java.io.ObjectOut
Re: How to verify if spark is using kryo serializer for shuffle
I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one. All of them have 6474 tasks. 5th task is a count operations and it also performs aggregateByKey as a part of it lazy evaluation. I am setting: spark.driver.memory=10G, spark.yarn.am.memory=2G and spark.driver.maxResultSize=9G On a side note, could it be something to do with java serialization library, ByteArrayOutputStream using byte array? Can it be replaced by some better serializing library? https://bugs.openjdk.java.net/browse/JDK-8055949 https://bugs.openjdk.java.net/browse/JDK-8136527 Thanks On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com> wrote: > Driver maintains the complete metadata of application ( scheduling of > executor and maintaining the messaging to control the execution ) > This code seems to be failing in that code path only. With that said there > is Jvm overhead based on num of executors , stages and tasks in your app. > Do you know your driver heap size and application structure ( num of stages > and tasks ) > > Ashish > > On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote: > >> Right but this logs from spark driver and spark driver seems to use Akka. >> >> ERROR [sparkDriver-akka.actor.default-dispatcher-17] >> akka.actor.ActorSystemImpl: Uncaught fatal error from thread >> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down >> ActorSystem [sparkDriver] >> >> I saw following logs before above happened. >> >> 2016-05-06 09:49:17,813 INFO >> [sparkDriver-akka.actor.default-dispatcher-17] >> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output >> locations for shuffle 1 to hdn6.xactlycorporation.local:44503 >> >> >> As far as I know driver is just driving shuffle operation but not >> actually doing anything within its own system that will cause memory issue. >> Can you explain in what circumstances I could see this error in driver >> logs? I don't do any collect or any other driver operation that would cause >> this. It fails when doing aggregateByKey operation but that should happen >> in executor JVM NOT in driver JVM. >> >> >> Thanks >> >> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> bq. at akka.serialization.JavaSerializer.toBinary( >>> Serializer.scala:129) >>> >>> It was Akka which uses JavaSerializer >>> >>> Cheers >>> >>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I thought I was using kryo serializer for shuffle. I could verify it >>>> from spark UI - Environment tab that >>>> spark.serializer org.apache.spark.serializer.KryoSerializer >>>> spark.kryo.registrator >>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator >>>> >>>> >>>> But when I see following error in Driver logs it looks like spark is >>>> using JavaSerializer >>>> >>>> 2016-05-06 09:49:26,490 ERROR >>>> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl: >>>> Uncaught fatal error from thread >>>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down >>>> ActorSystem [sparkDriver] >>>> >>>> java.lang.OutOfMemoryError: Java heap space >>>> >>>> at java.util.Arrays.copyOf(Arrays.java:2271) >>>> >>>> at >>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) >>>> >>>> at >>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >>>> >>>> at >>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) >>>> >>>> at >>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) >>>> >>>> at >>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) >>>> >>>> at >>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) >>>> >>>> at >>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >>>> >>>> at >>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) >>>> >>>> at >>>> akka.serialization.JavaSeriali
Re: How to verify if spark is using kryo serializer for shuffle
Driver maintains the complete metadata of application ( scheduling of executor and maintaining the messaging to control the execution ) This code seems to be failing in that code path only. With that said there is Jvm overhead based on num of executors , stages and tasks in your app. Do you know your driver heap size and application structure ( num of stages and tasks ) Ashish On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote: > Right but this logs from spark driver and spark driver seems to use Akka. > > ERROR [sparkDriver-akka.actor.default-dispatcher-17] > akka.actor.ActorSystemImpl: Uncaught fatal error from thread > [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down > ActorSystem [sparkDriver] > > I saw following logs before above happened. > > 2016-05-06 09:49:17,813 INFO > [sparkDriver-akka.actor.default-dispatcher-17] > org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output > locations for shuffle 1 to hdn6.xactlycorporation.local:44503 > > > As far as I know driver is just driving shuffle operation but not actually > doing anything within its own system that will cause memory issue. Can you > explain in what circumstances I could see this error in driver logs? I > don't do any collect or any other driver operation that would cause this. > It fails when doing aggregateByKey operation but that should happen in > executor JVM NOT in driver JVM. > > > Thanks > > On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com > <javascript:_e(%7B%7D,'cvml','yuzhih...@gmail.com');>> wrote: > >> bq. at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) >> >> It was Akka which uses JavaSerializer >> >> Cheers >> >> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com >> <javascript:_e(%7B%7D,'cvml','npa...@xactlycorp.com');>> wrote: >> >>> Hi, >>> >>> I thought I was using kryo serializer for shuffle. I could verify it >>> from spark UI - Environment tab that >>> spark.serializer org.apache.spark.serializer.KryoSerializer >>> spark.kryo.registrator >>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator >>> >>> >>> But when I see following error in Driver logs it looks like spark is >>> using JavaSerializer >>> >>> 2016-05-06 09:49:26,490 ERROR >>> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl: >>> Uncaught fatal error from thread >>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down >>> ActorSystem [sparkDriver] >>> >>> java.lang.OutOfMemoryError: Java heap space >>> >>> at java.util.Arrays.copyOf(Arrays.java:2271) >>> >>> at >>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) >>> >>> at >>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >>> >>> at >>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) >>> >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) >>> >>> at >>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) >>> >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) >>> >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >>> >>> at >>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) >>> >>> at >>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) >>> >>> at >>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) >>> >>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>> >>> at >>> akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) >>> >>> at >>> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) >>> >>> at >>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) >>> >>> at >>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) >>> >>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>> >>> at
Re: How to verify if spark is using kryo serializer for shuffle
Right but this logs from spark driver and spark driver seems to use Akka. ERROR [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver] I saw following logs before above happened. 2016-05-06 09:49:17,813 INFO [sparkDriver-akka.actor.default-dispatcher-17] org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to hdn6.xactlycorporation.local:44503 As far as I know driver is just driving shuffle operation but not actually doing anything within its own system that will cause memory issue. Can you explain in what circumstances I could see this error in driver logs? I don't do any collect or any other driver operation that would cause this. It fails when doing aggregateByKey operation but that should happen in executor JVM NOT in driver JVM. Thanks On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote: > bq. at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) > > It was Akka which uses JavaSerializer > > Cheers > > On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com> > wrote: > >> Hi, >> >> I thought I was using kryo serializer for shuffle. I could verify it >> from spark UI - Environment tab that >> spark.serializer org.apache.spark.serializer.KryoSerializer >> spark.kryo.registrator >> com.myapp.spark.jobs.conf.SparkSerializerRegistrator >> >> >> But when I see following error in Driver logs it looks like spark is >> using JavaSerializer >> >> 2016-05-06 09:49:26,490 ERROR >> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl: >> Uncaught fatal error from thread >> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down >> ActorSystem [sparkDriver] >> >> java.lang.OutOfMemoryError: Java heap space >> >> at java.util.Arrays.copyOf(Arrays.java:2271) >> >> at >> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) >> >> at >> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) >> >> at >> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) >> >> at >> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) >> >> at >> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) >> >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) >> >> at >> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >> >> at >> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) >> >> at >> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) >> >> at >> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) >> >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> >> at >> akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) >> >> at >> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) >> >> at >> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) >> >> at >> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) >> >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> >> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842) >> >> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743) >> >> at >> akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718) >> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >> >> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411) >> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> at >>
Re: How to verify if spark is using kryo serializer for shuffle
bq. at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) It was Akka which uses JavaSerializer Cheers On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > Hi, > > I thought I was using kryo serializer for shuffle. I could verify it from > spark UI - Environment tab that > spark.serializer org.apache.spark.serializer.KryoSerializer > spark.kryo.registrator > com.myapp.spark.jobs.conf.SparkSerializerRegistrator > > > But when I see following error in Driver logs it looks like spark is using > JavaSerializer > > 2016-05-06 09:49:26,490 ERROR > [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl: > Uncaught fatal error from thread > [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down > ActorSystem [sparkDriver] > > java.lang.OutOfMemoryError: Java heap space > > at java.util.Arrays.copyOf(Arrays.java:2271) > > at > java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > > at > java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) > > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) > > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) > > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) > > at > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > > at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) > > at > akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) > > at > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) > > at > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > > at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842) > > at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743) > > at > akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718) > > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > What I am missing here? > > Thanks > > > > [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> > > <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] > <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] > <https://twitter.com/Xactly> [image: Facebook] > <https://www.facebook.com/XactlyCorp> [image: YouTube] > <http://www.youtube.com/xactlycorporation>
How to verify if spark is using kryo serializer for shuffle
Hi, I thought I was using kryo serializer for shuffle. I could verify it from spark UI - Environment tab that spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryo.registrator com.myapp.spark.jobs.conf.SparkSerializerRegistrator But when I see following error in Driver logs it looks like spark is using JavaSerializer 2016-05-06 09:49:26,490 ERROR [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842) at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743) at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) What I am missing here? Thanks -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] <https://twitter.com/Xactly> [image: Facebook] <https://www.facebook.com/XactlyCorp> [image: YouTube] <http://www.youtube.com/xactlycorporation>
Re: Kryo serialization mismatch in spark sql windowing function
ar,/opt/hive/lib/hive-accumulo-handler-1.2.0.jar,/opt/hive/lib/ant-launcher-1.9.1.jar,/opt/hive/lib/hive-jdbc-1.2.0.jar,/opt/hive/lib/commons-compress-1.4.1.jar,/opt/hive/lib/commons-logging-1.1.3.jar,/opt/hive/lib/hive-serde-1.2.0.jar,/opt/hive/lib/zookeeper-3.4.6.jar,/opt/hive/lib/accumulo-start-1.6.0.jar,/opt/hive/lib/hive-contrib-1.2.0.jar,/opt/hive/lib/log4j-1.2.16.jar,/opt/hive/lib/commons-compiler-2.7.6.jar,/opt/hive/lib/ST4-4.0.4.jar,/opt/hive/lib/calcite-avatica-1.2.0-incubating.jar,/opt/hive/lib/httpclient-4.4.jar,/opt/hive/lib/commons-codec-1.4.jar,/opt/hive/lib/commons-io-2.4.jar,/opt/hive/lib/commons-digester-1.8.jar,/opt/hive/lib/regexp-1.3.jar,/opt/hive/lib/ivy-2.4.0.jar,/opt/hive/lib/eigenbase-properties-1.1.5.jar,/opt/hive/lib/paranamer-2.3.jar,/opt/hive/lib/mail-1.4.1.jar,/opt/hive/lib/asm-commons-3.1.jar,/opt/hive/lib/commons-lang-2.6.jar,/opt/hive/lib/hive-jdbc-1.2.0-standalone.jar,/opt/hive/lib/hive-shims-common-1.2.0.jar,/opt/hive/lib/hamcrest-core-1.1.jar,/opt/hive/lib/super-csv-2.2.0.jar, spark.history.ui.port -> 18080 spark.fileserver.port -> 45090 spark.history.retainedApplications -> 999 spark.ui.port -> 45100 spark.shuffle.consolidateFiles -> true spark.executor.extraJavaOptions -> -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -Djava.library.path=/opt/hadoop/lib/native/ spark.history.fs.logDirectory -> hdfs:///logs/spark-history spark.eventLog.dir -> hdfs:///logs/spark-history/alti_soam/ spark.executor.extraClassPath -> spark-hive_2.10-1.6.1.jar:spark-hive-thriftserver_2.10-1.6.1.jar spark.driver.port -> 45055 spark.port.maxRetries -> 999 spark.executor.port -> 45250 spark.driver.extraClassPath -> ... On Wed, Apr 6, 2016 at 6:59 PM, Josh Rosen <joshro...@databricks.com> wrote: > > Spark is compiled against a custom fork of Hive 1.2.1 which added shading > of Protobuf and removed shading of Kryo. What I think that what's happening > here is that stock Hive 1.2.1 is taking precedence so the Kryo instance > that it's returning is an instance of shaded/relocated Hive version rather > than the unshaded, stock Kryo that Spark is expecting here. > > I just so happen to have a patch which reintroduces the shading of Kryo > (motivated by other factors): https://github.com/apache/spark/pull/12215; > there's a chance that a backport of this patch might fix this problem. > > However, I'm a bit curious about how your classpath is set up and why > stock 1.2.1's shaded Kryo is being used here. > > /cc +Marcelo Vanzin <van...@cloudera.com> and +Steve Loughran > <ste...@hortonworks.com>, who may know more. > > On Wed, Apr 6, 2016 at 6:08 PM Soam Acharya <s...@altiscale.com> wrote: > >> Hi folks, >> >> I have a build of Spark 1.6.1 on which spark sql seems to be functional >> outside of windowing functions. For example, I can create a simple external >> table via Hive: >> >> CREATE EXTERNAL TABLE PSTable (pid int, tty string, time string, cmd >> string) >> ROW FORMAT DELIMITED >> FIELDS TERMINATED BY ',' >> LINES TERMINATED BY '\n' >> STORED AS TEXTFILE >> LOCATION '/user/test/ps'; >> >> Ensure that the table is pointing to some valid data, set up spark sql to >> point to the Hive metastore (we're running Hive 1.2.1) and run a basic test: >> >> spark-sql> select * from PSTable; >> 7239pts/0 00:24:31java >> 9993pts/9 00:00:00ps >> 9994pts/9 00:00:00tail >> 9995pts/9 00:00:00sed >> 9996pts/9 00:00:00sed >> >> But when I try to run a windowing function which I know runs onHive, I >> get: >> >> spark-sql> select a.pid ,a.time, a.cmd, min(a.time) over (partition by >> a.cmd order by a.time ) from PSTable a; >> org.apache.spark.SparkException: Task not serializable >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) >> at >> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) >> at >> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) >> at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) >> : >> : >> Caused by: java.lang.ClassCastException: >> org.apache.hive.com.esotericsoftware.kryo.Kryo cannot be cast to >> com.esotericsoftware.kryo.Kryo >> at >> org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.serializePlan(HiveShim.scala:178) >> at >> org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.writeExternal(HiveShim.scala:191) >> at >> java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> >> Any thoughts or ideas would be appreciated! >> >> Regards, >> >> Soam >> >
Re: Kryo serialization mismatch in spark sql windowing function
Spark is compiled against a custom fork of Hive 1.2.1 which added shading of Protobuf and removed shading of Kryo. What I think that what's happening here is that stock Hive 1.2.1 is taking precedence so the Kryo instance that it's returning is an instance of shaded/relocated Hive version rather than the unshaded, stock Kryo that Spark is expecting here. I just so happen to have a patch which reintroduces the shading of Kryo (motivated by other factors): https://github.com/apache/spark/pull/12215; there's a chance that a backport of this patch might fix this problem. However, I'm a bit curious about how your classpath is set up and why stock 1.2.1's shaded Kryo is being used here. /cc +Marcelo Vanzin <van...@cloudera.com> and +Steve Loughran <ste...@hortonworks.com>, who may know more. On Wed, Apr 6, 2016 at 6:08 PM Soam Acharya <s...@altiscale.com> wrote: > Hi folks, > > I have a build of Spark 1.6.1 on which spark sql seems to be functional > outside of windowing functions. For example, I can create a simple external > table via Hive: > > CREATE EXTERNAL TABLE PSTable (pid int, tty string, time string, cmd > string) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY ',' > LINES TERMINATED BY '\n' > STORED AS TEXTFILE > LOCATION '/user/test/ps'; > > Ensure that the table is pointing to some valid data, set up spark sql to > point to the Hive metastore (we're running Hive 1.2.1) and run a basic test: > > spark-sql> select * from PSTable; > 7239pts/0 00:24:31java > 9993pts/9 00:00:00ps > 9994pts/9 00:00:00tail > 9995pts/9 00:00:00sed > 9996pts/9 00:00:00sed > > But when I try to run a windowing function which I know runs onHive, I get: > > spark-sql> select a.pid ,a.time, a.cmd, min(a.time) over (partition by > a.cmd order by a.time ) from PSTable a; > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) > : > : > Caused by: java.lang.ClassCastException: > org.apache.hive.com.esotericsoftware.kryo.Kryo cannot be cast to > com.esotericsoftware.kryo.Kryo > at > org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.serializePlan(HiveShim.scala:178) > at > org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.writeExternal(HiveShim.scala:191) > at > java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > Any thoughts or ideas would be appreciated! > > Regards, > > Soam >
Kryo serialization mismatch in spark sql windowing function
Hi folks, I have a build of Spark 1.6.1 on which spark sql seems to be functional outside of windowing functions. For example, I can create a simple external table via Hive: CREATE EXTERNAL TABLE PSTable (pid int, tty string, time string, cmd string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION '/user/test/ps'; Ensure that the table is pointing to some valid data, set up spark sql to point to the Hive metastore (we're running Hive 1.2.1) and run a basic test: spark-sql> select * from PSTable; 7239pts/0 00:24:31java 9993pts/9 00:00:00ps 9994pts/9 00:00:00tail 9995pts/9 00:00:00sed 9996pts/9 00:00:00sed But when I try to run a windowing function which I know runs onHive, I get: spark-sql> select a.pid ,a.time, a.cmd, min(a.time) over (partition by a.cmd order by a.time ) from PSTable a; org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) : : Caused by: java.lang.ClassCastException: org.apache.hive.com.esotericsoftware.kryo.Kryo cannot be cast to com.esotericsoftware.kryo.Kryo at org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.serializePlan(HiveShim.scala:178) at org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.writeExternal(HiveShim.scala:191) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) Any thoughts or ideas would be appreciated! Regards, Soam
DataFrames - Kryo registration issue
Hello All, If Kryo serialization is enabled, doesn't Spark take care of registration of built-in classes, i.e., are we not supposed to register just the custom classes? When using DataFrames, this does not seem to be the case. I had to register the following classes conf.registerKryoClasses(Array(classOf[org.apache.spark.sql.types.StructType], classOf[org.apache.spark.sql.types.StructField], classOf[Array[org.apache.spark.sql.types.StructField]], classOf[org.apache.spark.sql.types.LongType$], classOf[org.apache.spark.sql.types.Metadata], classOf[scala.collection.immutable.Map$EmptyMap$], classOf[org.apache.spark.sql.catalyst.InternalRow], classOf[Array[org.apache.spark.sql.catalyst.InternalRow]], classOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow], classOf[Array[org.apache.spark.sql.catalyst.expressions.UnsafeRow]], Class.forName("org.apache.spark.sql.execution.joins.UnsafeHashedRelation"), Class.forName("java.util.HashMap"), classOf[scala.reflect.ClassTag$$anon$1], Class.forName("java.lang.Class"), Class.forName("org.apache.spark.sql.execution.columnar.CachedBatch"))) I got the following exception com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: byte[][] But byte is not a class. So I couldn't register it -- compiler complains that byte is not a class. How can I register byte[][] in Scala? Does this point to some other issue? In some other posts, I noticed use of kryo.register(). In this case, how do we pass the kryo object to SparkContext? Thanks in advance. Regards, Raghava.
Re: Spark Streaming 1.6 mapWithState not working well with Kryo Serialization
See https://issues.apache.org/jira/browse/SPARK-12591 After applying the patch, it should work. However, if you want to enable "registrationRequired", you still need to register "org.apache.spark.streaming.util.OpenHashMapBasedStateMap", "org.apache.spark.streaming.util.EmptyStateMap" and "org.apache.spark.streaming.rdd.MapWithStateRDDRecord" by yourself because these classes are defined in the Streaming project and we don't want to use them in Spark core. On Wed, Mar 2, 2016 at 1:41 PM, Aris <arisofala...@gmail.com> wrote: > Hello Spark folks and especially TD, > > I am using the Spark Streaming 1.6 mapWithState API, and I am trying to > enforce Kryo Serialization with > > SparkConf.set("spark.kryo.registrationRequired", "true") > > However, this appears to be impossible! I registered all the classes that > are my own, but I problem with a > class org.apache.spark.streaming.rdd.MapWithStateRDDRecord, which is set as > private[streaming] . > > > The error: > > java.lang.IllegalArgumentException: Class is not registered: > org.apache.spark.streaming.rdd.MapWithStateRDDRecord > Note: To register this class use: > kryo.register(org.apache.spark.streaming.rdd.MapWithStateRDDRecord.class); > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472) > > Since this class is private with spark streaming itself, I cannot actually > register it with Kryo, and I cannot do registrationRequired in order to > make sure *everything* has been serialized with Kryo. > > Is this a bug? Can I somehow solve this? > > Aris >
Spark Streaming 1.6 mapWithState not working well with Kryo Serialization
Hello Spark folks and especially TD, I am using the Spark Streaming 1.6 mapWithState API, and I am trying to enforce Kryo Serialization with SparkConf.set("spark.kryo.registrationRequired", "true") However, this appears to be impossible! I registered all the classes that are my own, but I problem with a class org.apache.spark.streaming.rdd.MapWithStateRDDRecord, which is set as private[streaming] . The error: java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.streaming.rdd.MapWithStateRDDRecord Note: To register this class use: kryo.register(org.apache.spark.streaming.rdd.MapWithStateRDDRecord.class); at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472) Since this class is private with spark streaming itself, I cannot actually register it with Kryo, and I cannot do registrationRequired in order to make sure *everything* has been serialized with Kryo. Is this a bug? Can I somehow solve this? Aris
Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:
Could you disable `spark.kryo.registrationRequired`? Some classes may not be registered but they work well with Kryo's default serializer. On Fri, Jan 8, 2016 at 8:58 AM, Ted Yuwrote: > bq. try adding scala.collection.mutable.WrappedArray > > But the hint said registering > scala.collection.mutable.WrappedArray$ofRef.class > , right ? > > On Fri, Jan 8, 2016 at 8:52 AM, jiml wrote: > >> (point of post is to see if anyone has ideas about errors at end of post) >> >> In addition, the real way to test if it's working is to force >> serialization: >> >> In Java: >> >> Create array of all your classes: >> // for kyro serializer it wants to register all classes that need to be >> serialized >> Class[] kryoClassArray = new Class[]{DropResult.class, >> DropEvaluation.class, >> PrintHetSharing.class}; >> >> in the builder for your SparkConf (or in conf/spark-defaults.sh) >> .set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer") >> //require registration of all classes with Kyro >> .set("spark.kryo.registrationRequired", "true") >> // don't forget to register ALL classes or will get error >> .registerKryoClasses(kryoClassArray); >> >> Then you will start to get neat errors like the one I am working on: >> >> Exception in thread "main" org.apache.spark.SparkException: Job aborted >> due >> to stage failure: Failed to serialize task 0, not attempting to retry it. >> Exception during serialization: java.io.IOException: >> java.lang.IllegalArgumentException: Class is not registered: >> scala.collection.mutable.WrappedArray$ofRef >> Note: To register this class use: >> kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); >> >> I did try adding scala.collection.mutable.WrappedArray to the Class array >> up >> top but no luck. Thanks >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/kryos-serializer-tp16454p25921.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:
(point of post is to see if anyone has ideas about errors at end of post) In addition, the real way to test if it's working is to force serialization: In Java: Create array of all your classes: // for kyro serializer it wants to register all classes that need to be serialized Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class, PrintHetSharing.class}; in the builder for your SparkConf (or in conf/spark-defaults.sh) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //require registration of all classes with Kyro .set("spark.kryo.registrationRequired", "true") // don't forget to register ALL classes or will get error .registerKryoClasses(kryoClassArray); Then you will start to get neat errors like the one I am working on: Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it. Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); I did try adding scala.collection.mutable.WrappedArray to the Class array up top but no luck. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/kryos-serializer-tp16454p25921.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:
bq. try adding scala.collection.mutable.WrappedArray But the hint said registering scala.collection.mutable.WrappedArray$ofRef.class , right ? On Fri, Jan 8, 2016 at 8:52 AM, jimlwrote: > (point of post is to see if anyone has ideas about errors at end of post) > > In addition, the real way to test if it's working is to force > serialization: > > In Java: > > Create array of all your classes: > // for kyro serializer it wants to register all classes that need to be > serialized > Class[] kryoClassArray = new Class[]{DropResult.class, > DropEvaluation.class, > PrintHetSharing.class}; > > in the builder for your SparkConf (or in conf/spark-defaults.sh) > .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > //require registration of all classes with Kyro > .set("spark.kryo.registrationRequired", "true") > // don't forget to register ALL classes or will get error > .registerKryoClasses(kryoClassArray); > > Then you will start to get neat errors like the one I am working on: > > Exception in thread "main" org.apache.spark.SparkException: Job aborted due > to stage failure: Failed to serialize task 0, not attempting to retry it. > Exception during serialization: java.io.IOException: > java.lang.IllegalArgumentException: Class is not registered: > scala.collection.mutable.WrappedArray$ofRef > Note: To register this class use: > kryo.register(scala.collection.mutable.WrappedArray$ofRef.class); > > I did try adding scala.collection.mutable.WrappedArray to the Class array > up > top but no luck. Thanks > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/kryos-serializer-tp16454p25921.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Kryo serialization fails when using SparkSQL and HiveContext
Hi everyone, I'm using HiveContext and SparkSQL to query a Hive table and doing join operation on it. After changing the default serializer to Kryo with spark.kryo.registrationRequired = true, the Spark application failed with the following error: java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.sql.catalyst.expressions.GenericRow Note: To register this class use: kryo.register(org.apache.spark.sql.catalyst.expressions.GenericRow.class); at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:124) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:204) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I'm using Spark 1.3.1 (HDP 2.3.0) and submitting Spark application to Yarn in cluster mode. Any help is appreciated. -- Linh M. Tran
Re: Kryo serialization fails when using SparkSQL and HiveContext
You'll need to either turn off registration (spark.kryo.registrationRequired) or create a custom register spark.kryo.registrator http://spark.apache.org/docs/latest/configuration.html#compression-and-serialization On Mon, Dec 14, 2015 at 2:17 AM, Linh M. Tran <linh.mtran...@gmail.com> wrote: > Hi everyone, > I'm using HiveContext and SparkSQL to query a Hive table and doing join > operation on it. > After changing the default serializer to Kryo with > spark.kryo.registrationRequired = true, the Spark application failed with > the following error: > > java.lang.IllegalArgumentException: Class is not registered: > org.apache.spark.sql.catalyst.expressions.GenericRow > Note: To register this class use: > kryo.register(org.apache.spark.sql.catalyst.expressions.GenericRow.class); > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565) > at > com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) > at > com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) > at > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:124) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:204) > at > org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > I'm using Spark 1.3.1 (HDP 2.3.0) and submitting Spark application to Yarn > in cluster mode. > Any help is appreciated. > -- > Linh M. Tran >
Re: Kryo Serialization in Spark
Are you sure you are using Kryo serialization. You are getting a java serialization error. Are you setting up your sparkcontext with kryo serialization enabled? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-Serialization-in-Spark-tp25628p25678.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
hbase Put object kryo serialisation error
Hi I have a javapairrdd<Immutablebytewritable,Put> pairrdd. when i do rdd.persist(StorageLevel.MEMORY_AND_DISK()). It throws exception : com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:100 serialisationtrace: familyMap(org.apache.hadoop.hbase.cleint.Put) I have registerd Put,and TreeMap in spark kryo serialiser. whats the issue of this error ? Full stack trace is : Exception in thread "main" com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 100 Serialization trace: familyMap (org.apache.hadoop.hbase.client.Put) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
Kryo Serialization in Spark
Hi All, I'm unable to use Kryo serializer in my Spark program. I'm loading a graph from an edgelist file using GraphLoader and performing a BFS using pregel API. But I get the below mentioned error while I'm running. Can anybody tell me what is the right way to serialize a class in Spark and what are the functions that needs to be implemented. class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[GraphBFS]) kryo.register(classOf[Config]) kryo.register(classOf[Iterator[(Long, Double)]]) } } Class GraphBFS{ def vprog(id: VertexId, attr: Double, msg: Double): Double = math.min(attr,msg) def sendMessage(triplet: EdgeTriplet[Double, Int]) : Iterator[(VertexId, Double)] = { var iter:Iterator[(VertexId, Double)] = Iterator.empty val isSrcMarked = triplet.srcAttr != Double.PositiveInfinity val isDstMarked = triplet.dstAttr != Double.PositiveInfinity if(!(isSrcMarked && isDstMarked)){ if(isSrcMarked){ iter = Iterator((triplet.dstId,triplet.srcAttr+1)) }else{ iter = Iterator((triplet.srcId,triplet.dstAttr+1)) } } iter } def reduceMessage(a: Double, b: Double) = math.min(a,b) def main() { .. val bfs = initialGraph.pregel(initialMessage, maxIterations, activeEdgeDirection)(vprog, sendMessage, reduceMessage) . } } 15/12/07 21:52:49 INFO BlockManager: Removing RDD 8 15/12/07 21:52:49 INFO BlockManager: Removing RDD 2 Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:683) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:682) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:682) at org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:96) at org.apache.spark.graphx.impl.GraphImpl.mapVertices(GraphImpl.scala:132) at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:122) at org.apache.spark.graphx.GraphOps.pregel(GraphOps.scala:362) at GraphBFS.main(GraphBFS.scala:241) at run$.main(GraphBFS.scala:268) at run.main(GraphBFS.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: GraphBFS Serialization stack: - object not serializable (class: GraphBFS, value: GraphBFS@575c3e9b) - field (class: GraphBFS$$anonfun$17, name: $outer, type: class GraphBFS) - object (class GraphBFS$$anonfun$17, ) - field (class: org.apache.spark.graphx.Pregel$$anonfun$1, name: vprog$1, type: interface scala.Function3) - object (class org.apache.spark.graphx.Pregel$$anonfun$1, ) - field (class: org.apache.spark.graphx.impl.GraphImpl$$anonfun$5, name: f$1, type: interface scala.Function2) - object (class org.apache.spark.graphx.impl.GraphImpl$$anonfun$5, ) - field (class: org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$1, name: f$1, type: interface scala.Function1) - object (class org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$1, ) Thanks, Prasad - Thanks, Prasad -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-Serialization-in-Spark-tp25628.html Sent from the Apache Spark User List mailing list arc
ClassCastException in Kryo Serialization
Hi, I seem to be getting class cast exception in Kryo Serialization. Following is the error. Child1 class is a map in parent class. Child1 has a hashSet testObjects of the type Object1. I get an error when it tries to deserialize Object1. Any idea as to why this is happening? com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException: Object1 cannot be cast to java.lang.String Serialization trace: testObjects (Child1) map (parent) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-in-Kryo-Serialization-tp25575.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why is Kryo not the default serializer?
Array issue was also discussed in Apache Hive forum. This problem seems like it can be resolved by using Kryo 3.x. Will upgrading to Kryo 3.x allow Kryo to become the default SerDes? https://issues.apache.org/jira/browse/HIVE-12174 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why is Kryo not the default serializer?
If Kryo usage is recommended, why is Java serialization the default serializer instead of Kryo? Is there some limitation to using Kryo? I've read through the documentation but it just seem Kryo is a better choice and should be made a default. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-Kryo-not-the-default-serializer-tp25338.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why is Kryo not the default serializer?
I have seen some failures in our workloads with Kryo, one I remember is a scenario with very large arrays. We could not get Kryo to work despite using the different configuration properties. Switching to java serde was what worked. Regards Sab On Tue, Nov 10, 2015 at 11:43 AM, Hitoshi Ozawa <ozaw...@worksap.co.jp> wrote: > If Kryo usage is recommended, why is Java serialization the default > serializer instead of Kryo? Is there some limitation to using Kryo? I've > read through the documentation but it just seem Kryo is a better choice > and > should be made a default. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-Kryo-not-the-default-serializer-tp25338.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++
Kryo makes String data invalid
Hi all, I have a parquet file, which I am loading in a shell. When I launch the shell with -driver-java-options ="-Dspark.serializer=...kryo", makes a couple fields look like: 03-?? ??-?? ??-??? when calling > data.first I will confirm briefly, but I am utterly sure it happens only on StringType fields. Why could this be happening? perhaps when creating the parquet file from spark, Kryo wasn't set up? If I disable Kryo, data looks good. Any ideas? Saif
Enabling kryo serialization slows down machine learning app.
Hi, my team is setting up a machine-learning framework based on Spark's mlib, that currently uses logistic regression. I enabled Kryo serialization and enforced class registration, so I know that all the serialized classes are registered. However, the running times when Kryo serialization is enabled are consistently longer. This is true both when running locally on a smaller samples (1.6 minutes vs 1.3m) and also when running with a larger sample on AWS with two workers nodes (2h30 vs 1h50). Using the monitoring tools suggests that Task Deserialization Times are similar (although perhaps slightly longer for Kryo), but Task Durations and even Scheduler Delays increase significantly. There is also a significant difference in memory usage: for Kryo the number of stored RDDs is higher (much more so on the local sample: 40 vs. 4). Does anyone have an idea of what can be going on, or where should I focus to find out? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Enabling-kryo-serialization-slows-down-machine-learning-app-tp24947.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar
Hi Nick/Igor, Any solution for this ? Even I am having the same issue and copying jar to each executor is not feasible if we use lot of jars. Thanks, Vipul
Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar
as a starting point, attach your stacktrace... ps: look for duplicates in your classpath, maybe you include another jar with same class On 8 September 2015 at 06:38, Nicholas R. Peterson <nrpeter...@gmail.com> wrote: > I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through Yarn. > Serialization is set to use Kryo. > > I have a large object which I send to the executors as a Broadcast. The > object seems to serialize just fine. When it attempts to deserialize, > though, Kryo throws a ClassNotFoundException... for a class that I include > in the fat jar that I spark-submit. > > What could be causing this classpath issue with Kryo on the executors? > Where should I even start looking to try to diagnose the problem? I > appreciate any help you can provide. > > Thank you! > > -- Nick >
Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar
Thans, Igor; I've got it running again right now, and can attach the stack trace when it finishes. In the mean time, I've noticed something interesting: in the Spark UI, the application jar that I submit is not being included on the classpath. It has been successfully uploaded to the nodes -- in the nodemanager directory for the application, I see __app__.jar and __spark__.jar. The directory itself is on the classpath, and __spark__.jar and __hadoop_conf__ are as well. When I do everything the same but switch the master to local[*], the jar I submit IS added to the classpath. This seems like a likely culprit. What could cause this, and how can I fix it? Best, Nick On Tue, Sep 8, 2015 at 1:14 AM Igor Berman <igor.ber...@gmail.com> wrote: > as a starting point, attach your stacktrace... > ps: look for duplicates in your classpath, maybe you include another jar > with same class > > On 8 September 2015 at 06:38, Nicholas R. Peterson <nrpeter...@gmail.com> > wrote: > >> I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through Yarn. >> Serialization is set to use Kryo. >> >> I have a large object which I send to the executors as a Broadcast. The >> object seems to serialize just fine. When it attempts to deserialize, >> though, Kryo throws a ClassNotFoundException... for a class that I include >> in the fat jar that I spark-submit. >> >> What could be causing this classpath issue with Kryo on the executors? >> Where should I even start looking to try to diagnose the problem? I >> appreciate any help you can provide. >> >> Thank you! >> >> -- Nick >> > >
Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar
Yes, the jar contains the class: $ jar -tf lumiata-evaluation-assembly-1.0.jar | grep 2028/Document/Document com/i2028/Document/Document$1.class com/i2028/Document/Document.class What else can I do? Is there any way to get more information about the classes available to the particular classloader kryo is using? On Tue, Sep 8, 2015 at 6:34 AM Igor Berman <igor.ber...@gmail.com> wrote: > java.lang.ClassNotFoundException: com.i2028.Document.Document > > 1. so have you checked that jar that you create(fat jar) contains this class? > > 2. might be there is some stale cache issue...not sure though > > > On 8 September 2015 at 16:12, Nicholas R. Peterson <nrpeter...@gmail.com> > wrote: > >> Here is the stack trace: (Sorry for the duplicate, Igor -- I forgot to >> include the list.) >> >> >> 15/09/08 05:56:43 WARN scheduler.TaskSetManager: Lost task 183.0 in stage >> 41.0 (TID 193386, ds-compute2.lumiata.com): java.io.IOException: >> com.esotericsoftware.kryo.KryoException: Error constructing instance of >> class: com.lumiata.patientanalysis.utils.CachedGraph >> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257) >> at >> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) >> at >> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) >> at >> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) >> at >> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) >> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) >> at >> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:44) >> at >> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:43) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) >> at org.apache.spark.scheduler.Task.run(Task.scala:70) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing >> instance of class: com.lumiata.patientanalysis.utils.CachedGraph >> at >> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126) >> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >> at >> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182) >> at >> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217) >> at >> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178) >> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254) >> ... 24 more >> Caused by: java.lang.reflect.InvocationTargetException >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingCo
Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar
) at com.lumiata.patientanalysis.utils.CachedGraph.(CachedGraph.java:178) ... 38 more Caused by: java.lang.ClassNotFoundException: com.i2028.Document.Document at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ... 47 more > On Tue, Sep 8, 2015 at 6:01 AM Igor Berman <igor.ber...@gmail.com> wrote: > >> I wouldn't build on this. local mode & yarn are different so that jars >> you use in spark submit are handled differently >> >> On 8 September 2015 at 15:43, Nicholas R. Peterson <nrpeter...@gmail.com> >> wrote: >> >>> Thans, Igor; I've got it running again right now, and can attach the >>> stack trace when it finishes. >>> >>> In the mean time, I've noticed something interesting: in the Spark UI, >>> the application jar that I submit is not being included on the classpath. >>> It has been successfully uploaded to the nodes -- in the nodemanager >>> directory for the application, I see __app__.jar and __spark__.jar. The >>> directory itself is on the classpath, and __spark__.jar and __hadoop_conf__ >>> are as well. When I do everything the same but switch the master to >>> local[*], the jar I submit IS added to the classpath. >>> >>> This seems like a likely culprit. What could cause this, and how can I >>> fix it? >>> >>> Best, >>> Nick >>> >>> On Tue, Sep 8, 2015 at 1:14 AM Igor Berman <igor.ber...@gmail.com> >>> wrote: >>> >>>> as a starting point, attach your stacktrace... >>>> ps: look for duplicates in your classpath, maybe you include another >>>> jar with same class >>>> >>>> On 8 September 2015 at 06:38, Nicholas R. Peterson < >>>> nrpeter...@gmail.com> wrote: >>>> >>>>> I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through >>>>> Yarn. Serialization is set to use Kryo. >>>>> >>>>> I have a large object which I send to the executors as a Broadcast. >>>>> The object seems to serialize just fine. When it attempts to deserialize, >>>>> though, Kryo throws a ClassNotFoundException... for a class that I include >>>>> in the fat jar that I spark-submit. >>>>> >>>>> What could be causing this classpath issue with Kryo on the executors? >>>>> Where should I even start looking to try to diagnose the problem? I >>>>> appreciate any help you can provide. >>>>> >>>>> Thank you! >>>>> >>>>> -- Nick >>>>> >>>> >>>> >>
Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar
olver.java:115) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626) > at > com.lumiata.patientanalysis.utils.CachedGraph.loadCacheFromSerializedData(CachedGraph.java:221) > at > com.lumiata.patientanalysis.utils.CachedGraph.(CachedGraph.java:182) > at > com.lumiata.patientanalysis.utils.CachedGraph.(CachedGraph.java:178) > ... 38 more > Caused by: java.lang.ClassNotFoundException: com.i2028.Document.Document > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) > ... 47 more > > > >> On Tue, Sep 8, 2015 at 6:01 AM Igor Berman <igor.ber...@gmail.com> wrote: >> >>> I wouldn't build on this. local mode & yarn are different so that jars >>> you use in spark submit are handled differently >>> >>> On 8 September 2015 at 15:43, Nicholas R. Peterson <nrpeter...@gmail.com >>> > wrote: >>> >>>> Thans, Igor; I've got it running again right now, and can attach the >>>> stack trace when it finishes. >>>> >>>> In the mean time, I've noticed something interesting: in the Spark UI, >>>> the application jar that I submit is not being included on the classpath. >>>> It has been successfully uploaded to the nodes -- in the nodemanager >>>> directory for the application, I see __app__.jar and __spark__.jar. The >>>> directory itself is on the classpath, and __spark__.jar and __hadoop_conf__ >>>> are as well. When I do everything the same but switch the master to >>>> local[*], the jar I submit IS added to the classpath. >>>> >>>> This seems like a likely culprit. What could cause this, and how can I >>>> fix it? >>>> >>>> Best, >>>> Nick >>>> >>>> On Tue, Sep 8, 2015 at 1:14 AM Igor Berman <igor.ber...@gmail.com> >>>> wrote: >>>> >>>>> as a starting point, attach your stacktrace... >>>>> ps: look for duplicates in your classpath, maybe you include another >>>>> jar with same class >>>>> >>>>> On 8 September 2015 at 06:38, Nicholas R. Peterson < >>>>> nrpeter...@gmail.com> wrote: >>>>> >>>>>> I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through >>>>>> Yarn. Serialization is set to use Kryo. >>>>>> >>>>>> I have a large object which I send to the executors as a Broadcast. >>>>>> The object seems to serialize just fine. When it attempts to deserialize, >>>>>> though, Kryo throws a ClassNotFoundException... for a class that I >>>>>> include >>>>>> in the fat jar that I spark-submit. >>>>>> >>>>>> What could be causing this classpath issue with Kryo on the >>>>>> executors? Where should I even start looking to try to diagnose the >>>>>> problem? I appreciate any help you can provide. >>>>>> >>>>>> Thank you! >>>>>> >>>>>> -- Nick >>>>>> >>>>> >>>>> >>>