Re: spark 2.x design docs
https://github.com/JerryLead/SparkInternals/blob/master/EnglishVersion/2-JobLogicalPlan.md This is pretty old. but it might help a little bit. I myself am going through the source code and trying to reverse engineer stuff. Let me know if you'd like to pool resources sometime. Regards On Thu, Sep 19, 2019 at 11:35 AM wrote: > Hi , > > Can someone provide documents/links (apart from official documentation) *for > understanding internal workings of spark-core*, > > Document containing components pseudo codes, class diagrams, execution > flows , etc. > > Thanks, Kamal > > > "*Confidentiality Warning*: This message and any attachments are intended > only for the use of the intended recipient(s), are confidential and may be > privileged. If you are not the intended recipient, you are hereby notified > that any review, re-transmission, conversion to hard copy, copying, > circulation or other use of this message and any attachments is strictly > prohibited. If you are not the intended recipient, please notify the sender > immediately by return email and delete this message and any attachments > from your system. > > *Virus Warning:* Although the company has taken reasonable precautions to > ensure no viruses are present in this email. The company cannot accept > responsibility for any loss or damage arising from the use of this email or > attachment." >
spark 2.x design docs
Hi , Can someone provide documents/links (apart from official documentation) for understanding internal workings of spark-core, Document containing components pseudo codes, class diagrams, execution flows , etc. Thanks, Kamal "Confidentiality Warning: This message and any attachments are intended only for the use of the intended recipient(s). are confidential and may be privileged. If you are not the intended recipient. you are hereby notified that any review. re-transmission. conversion to hard copy. copying. circulation or other use of this message and any attachments is strictly prohibited. If you are not the intended recipient. please notify the sender immediately by return email. and delete this message and any attachments from your system. Virus Warning: Although the company has taken reasonable precautions to ensure no viruses are present in this email. The company cannot accept responsibility for any loss or damage arising from the use of this email or attachment."
Re: intermittent Kryo serialization failures in Spark
I remember it not working for us when we were setting it from the inside and needed to actually pass it On Wed, Sep 18, 2019 at 10:38 AM Jerry Vinokurov wrote: > Hi Vadim, > > Thanks for your suggestion. We do preregister the classes, like so: > > object KryoRegistrar { >> >> val classesToRegister: Array[Class[_]] = Array( >> classOf[MyModel], >>[etc] >> ) } >> > > And then we do: > > val sparkConf = new SparkConf() >> .registerKryoClasses(KryoRegistrar.classesToRegister) >> > > I notice that this is a bit different from your code and I'm wondering > whether there's any functional difference or if these are two ways to get > to the same end. Our code is directly adapted from the Spark documentation > on how to use the Kryo serializer but maybe there's some subtlety here that > I'm missing. > > With regard to the settings, it looks like we currently have the default > settings, which is to say that referenceTracking is true, > registrationRequired is false, unsafe is false, and buffer.max is 64m (none > of our objects are anywhere near that size but... who knows). I will try it > with your suggestions and see if it solves the problem. > > thanks, > Jerry > > On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov wrote: > >> Pre-register your classes: >> >> ``` >> import com.esotericsoftware.kryo.Kryo >> import org.apache.spark.serializer.KryoRegistrator >> >> class MyKryoRegistrator extends KryoRegistrator { >> override def registerClasses(kryo: Kryo): Unit = { >> kryo.register(Class.forName("[[B")) // byte[][] >> kryo.register(classOf[java.lang.Class[_]]) >> } >> } >> ``` >> >> then run with >> >> 'spark.kryo.referenceTracking': 'false', >> 'spark.kryo.registrationRequired': 'false', >> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator', >> 'spark.kryo.unsafe': 'false', >> 'spark.kryoserializer.buffer.max': '256m', >> >> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov >> wrote: >> >>> Hi folks, >>> >>> Posted this some time ago but the problem continues to bedevil us. I'm >>> including a (slightly edited) stack trace that results from this error. If >>> anyone can shed any light on what exactly is happening here and what we can >>> do to avoid it, that would be much appreciated. >>> >>> org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) at org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.exchange.ShuffleExchange
Re: intermittent Kryo serialization failures in Spark
Hi Vadim, Thanks for your suggestion. We do preregister the classes, like so: object KryoRegistrar { > > val classesToRegister: Array[Class[_]] = Array( > classOf[MyModel], >[etc] > ) } > And then we do: val sparkConf = new SparkConf() > .registerKryoClasses(KryoRegistrar.classesToRegister) > I notice that this is a bit different from your code and I'm wondering whether there's any functional difference or if these are two ways to get to the same end. Our code is directly adapted from the Spark documentation on how to use the Kryo serializer but maybe there's some subtlety here that I'm missing. With regard to the settings, it looks like we currently have the default settings, which is to say that referenceTracking is true, registrationRequired is false, unsafe is false, and buffer.max is 64m (none of our objects are anywhere near that size but... who knows). I will try it with your suggestions and see if it solves the problem. thanks, Jerry On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov wrote: > Pre-register your classes: > > ``` > import com.esotericsoftware.kryo.Kryo > import org.apache.spark.serializer.KryoRegistrator > > class MyKryoRegistrator extends KryoRegistrator { > override def registerClasses(kryo: Kryo): Unit = { > kryo.register(Class.forName("[[B")) // byte[][] > kryo.register(classOf[java.lang.Class[_]]) > } > } > ``` > > then run with > > 'spark.kryo.referenceTracking': 'false', > 'spark.kryo.registrationRequired': 'false', > 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator', > 'spark.kryo.unsafe': 'false', > 'spark.kryoserializer.buffer.max': '256m', > > On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov > wrote: > >> Hi folks, >> >> Posted this some time ago but the problem continues to bedevil us. I'm >> including a (slightly edited) stack trace that results from this error. If >> anyone can shed any light on what exactly is happening here and what we can >> do to avoid it, that would be much appreciated. >> >> org.apache.spark.SparkException: Failed to register classes with Kryo >>> at >>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) >>> at >>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) >>> at >>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309) >>> at >>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88) >>> at >>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >>> at >>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) >>> at >>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) >>> at >>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) >>> at >>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) >>> at >>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) >>> at >>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) >>> at >>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) >>> at >>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) >>> at >>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >>> at >>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>> at >>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>> at >>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>> at >>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >>> at >>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92) >>> at >>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) >>> at >>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$an
Re: custom rdd - do I need a hadoop input format?
To implement a custom RDD with getPartitions, I have to extend `NewHadoopRDD` informing the hadoop input format class, right? What input format could I inform so the file won't be read all at once and my getPartitions method could split by block? On Tue, 17 Sep 2019 at 18:53, Arun Mahadevan wrote: > You can do it with custom RDD implementation. > You will mainly implement "getPartitions" - the logic to split your input > into partitions and "compute" to compute and return the values from the > executors. > > On Tue, 17 Sep 2019 at 08:47, Marcelo Valle > wrote: > >> Just to be more clear about my requirements, what I have is actually a >> custom format, with header, summary and multi line blocks. I want to create >> tasks per block and no per line.I already have a library that reads an >> InputStream and outputs an Iterator of Block, but now I need to integrate >> this with spark >> >> On Tue, 17 Sep 2019 at 16:28, Marcelo Valle >> wrote: >> >>> Hi, >>> >>> I want to create a custom RDD which will read n lines in sequence from a >>> file, which I call a block, and each block should be converted to a spark >>> dataframe to be processed in parallel. >>> >>> Question - do I have to implement a custom hadoop input format to >>> achieve this? Or is it possible to do it only with RDD APIs? >>> >>> Thanks, >>> Marcelo. >>> >> >> This email is confidential [and may be protected by legal privilege]. If >> you are not the intended recipient, please do not copy or disclose its >> content but contact the sender immediately upon receipt. >> >> KTech Services Ltd is registered in England as company number 10704940. >> >> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, >> United Kingdom >> > This email is confidential [and may be protected by legal privilege]. If you are not the intended recipient, please do not copy or disclose its content but contact the sender immediately upon receipt. KTech Services Ltd is registered in England as company number 10704940. Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United Kingdom