[SQL] Why does a small two-source JDBC query take ~150-200ms with all optimizations (AQE, CBO, pushdown, Kryo, unsafe) enabled? (v3.4.0-SNAPSHOT)

2022-05-18 Thread Gavin Ray
I did some basic testing of multi-source queries with the most recent Spark:
https://github.com/GavinRay97/spark-playground/blob/44a756acaee676a9b0c128466e4ab231a7df8d46/src/main/scala/Application.scala#L46-L115

The output of "spark.time()" surprised me:

SELECT p.id, p.name, t.id, t.title
FROM db1.public.person p
JOIN db2.public.todos t
ON p.id = t.person_id
WHERE p.id = 1

+---++---+--+
| id|name| id| title|
+---++---+--+
|  1| Bob|  1|Todo 1|
|  1| Bob|  2|Todo 2|
+---++---+--+
Time taken: 168 ms

SELECT p.id, p.name, t.id, t.title
FROM db1.public.person p
JOIN db2.public.todos t
ON p.id = t.person_id
WHERE p.id = 2
LIMIT 1

+---+-+---+--+
| id| name| id| title|
+---+-+---+--+
|  2|Alice|  3|Todo 3|
+---+-+---+--+
Time taken: 228 ms


Calcite and Teiid manage to do this on the order of 5-50ms for basic
queries,
so I'm curious about the technical specifics on why Spark appears to be so
much slower here?


Supporting Kryo registration in DSv2

2020-03-26 Thread Andrew Melo
Hello all,

Is there a way to register classes within a datasourcev2 implementation in
the Kryo serializer?

I've attempted the following in both the constructor and static block of my
toplevel class:

SparkContext context = SparkContext.getOrCreate();
SparkConf conf = context.getConf();
Class[] classesRegistered = new Class[] {
edu.vanderbilt.accre.laurelin.spark_ttree.Reader.class,
edu.vanderbilt.accre.laurelin.spark_ttree.Partition.class,
edu.vanderbilt.accre.laurelin.spark_ttree.SlimTBranch.class
};
conf.registerKryoClasses(classesRegistered);

But (if I'm reading correctly), this is too late, since the config has
already been parsed while initializing the SparkContext, adding classes to
the SparkConf has no effect. From what I can tell, the kryo instance behind
is private, so I can't add the registration manually either.

Any thoughts?
Thanks
Andrew


Re: intermittent Kryo serialization failures in Spark

2019-09-25 Thread Jerry Vinokurov
Hi Julien,

Thanks for the suggestion. If we don't do a broadcast, that would
presumably affect the performance of the job, as the model that is failing
to be broadcast is something that we need to be shared across the cluster.
But it may be worth it if the trade-off is not having things run properly.
Vadim's suggestions did not make a difference for me (still hitting this
error several times a day) but I'll try with disabling broadcast and see if
that does anything.

thanks,
Jerry

On Fri, Sep 20, 2019 at 10:00 AM Julien Laurenceau <
julien.laurenc...@pepitedata.com> wrote:

> Hi,
> Did you try without the broadcast ?
> Regards
> JL
>
> Le jeu. 19 sept. 2019 à 06:41, Vadim Semenov 
> a écrit :
>
>> Pre-register your classes:
>>
>> ```
>> import com.esotericsoftware.kryo.Kryo
>> import org.apache.spark.serializer.KryoRegistrator
>>
>> class MyKryoRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo): Unit = {
>> kryo.register(Class.forName("[[B")) // byte[][]
>> kryo.register(classOf[java.lang.Class[_]])
>>   }
>> }
>> ```
>>
>> then run with
>>
>> 'spark.kryo.referenceTracking': 'false',
>> 'spark.kryo.registrationRequired': 'false',
>> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
>> 'spark.kryo.unsafe': 'false',
>> 'spark.kryoserializer.buffer.max': '256m',
>>
>> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov 
>> wrote:
>>
>>> Hi folks,
>>>
>>> Posted this some time ago but the problem continues to bedevil us. I'm
>>> including a (slightly edited) stack trace that results from this error. If
>>> anyone can shed any light on what exactly is happening here and what we can
>>> do to avoid it, that would be much appreciated.
>>>
>>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>>at 
>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>>at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>>>at 
>>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>>at 
>>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>>at 
>>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>>at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>>at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>>at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>>at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati

Re: intermittent Kryo serialization failures in Spark

2019-09-20 Thread Julien Laurenceau
Hi,
Did you try without the broadcast ?
Regards
JL

Le jeu. 19 sept. 2019 à 06:41, Vadim Semenov 
a écrit :

> Pre-register your classes:
>
> ```
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.serializer.KryoRegistrator
>
> class MyKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo): Unit = {
> kryo.register(Class.forName("[[B")) // byte[][]
> kryo.register(classOf[java.lang.Class[_]])
>   }
> }
> ```
>
> then run with
>
> 'spark.kryo.referenceTracking': 'false',
> 'spark.kryo.registrationRequired': 'false',
> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
> 'spark.kryo.unsafe': 'false',
> 'spark.kryoserializer.buffer.max': '256m',
>
> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov 
> wrote:
>
>> Hi folks,
>>
>> Posted this some time ago but the problem continues to bedevil us. I'm
>> including a (slightly edited) stack trace that results from this error. If
>> anyone can shed any light on what exactly is happening here and what we can
>> do to avoid it, that would be much appreciated.
>>
>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>> at 
>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>> at 
>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>> at 
>>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>>> at 
>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>> at 
>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>> at 
>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>> at 
>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>> at 
>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>> at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>> at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>> at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>> at 
>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>> at 
>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>> at 
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> at 
>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>> at 
>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>>> at 
>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>>> at 
>>> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>>> at 
>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:

Re: intermittent Kryo serialization failures in Spark

2019-09-18 Thread Vadim Semenov
I remember it not working for us when we were setting it from the inside
and needed to actually pass it

On Wed, Sep 18, 2019 at 10:38 AM Jerry Vinokurov 
wrote:

> Hi Vadim,
>
> Thanks for your suggestion. We do preregister the classes, like so:
>
> object KryoRegistrar {
>>
>>   val classesToRegister: Array[Class[_]] = Array(
>> classOf[MyModel],
>>[etc]
>> ) }
>>
>
> And then we do:
>
> val sparkConf = new SparkConf()
>>   .registerKryoClasses(KryoRegistrar.classesToRegister)
>>
>
>  I notice that this is a bit different from your code and I'm wondering
> whether there's any functional difference or if these are two ways to get
> to the same end. Our code is directly adapted from the Spark documentation
> on how to use the Kryo serializer but maybe there's some subtlety here that
> I'm missing.
>
> With regard to the settings, it looks like we currently have the default
> settings, which is to say that referenceTracking is true,
> registrationRequired is false, unsafe is false, and buffer.max is 64m (none
> of our objects are anywhere near that size but... who knows). I will try it
> with your suggestions and see if it solves the problem.
>
> thanks,
> Jerry
>
> On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov  wrote:
>
>> Pre-register your classes:
>>
>> ```
>> import com.esotericsoftware.kryo.Kryo
>> import org.apache.spark.serializer.KryoRegistrator
>>
>> class MyKryoRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo): Unit = {
>> kryo.register(Class.forName("[[B")) // byte[][]
>> kryo.register(classOf[java.lang.Class[_]])
>>   }
>> }
>> ```
>>
>> then run with
>>
>> 'spark.kryo.referenceTracking': 'false',
>> 'spark.kryo.registrationRequired': 'false',
>> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
>> 'spark.kryo.unsafe': 'false',
>> 'spark.kryoserializer.buffer.max': '256m',
>>
>> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov 
>> wrote:
>>
>>> Hi folks,
>>>
>>> Posted this some time ago but the problem continues to bedevil us. I'm
>>> including a (slightly edited) stack trace that results from this error. If
>>> anyone can shed any light on what exactly is happening here and what we can
>>> do to avoid it, that would be much appreciated.
>>>
>>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>>at 
>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>>at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>>>at 
>>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>>at 
>>>> or

Re: intermittent Kryo serialization failures in Spark

2019-09-18 Thread Jerry Vinokurov
Hi Vadim,

Thanks for your suggestion. We do preregister the classes, like so:

object KryoRegistrar {
>
>   val classesToRegister: Array[Class[_]] = Array(
> classOf[MyModel],
>[etc]
> ) }
>

And then we do:

val sparkConf = new SparkConf()
>   .registerKryoClasses(KryoRegistrar.classesToRegister)
>

 I notice that this is a bit different from your code and I'm wondering
whether there's any functional difference or if these are two ways to get
to the same end. Our code is directly adapted from the Spark documentation
on how to use the Kryo serializer but maybe there's some subtlety here that
I'm missing.

With regard to the settings, it looks like we currently have the default
settings, which is to say that referenceTracking is true,
registrationRequired is false, unsafe is false, and buffer.max is 64m (none
of our objects are anywhere near that size but... who knows). I will try it
with your suggestions and see if it solves the problem.

thanks,
Jerry

On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov  wrote:

> Pre-register your classes:
>
> ```
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.serializer.KryoRegistrator
>
> class MyKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo): Unit = {
> kryo.register(Class.forName("[[B")) // byte[][]
> kryo.register(classOf[java.lang.Class[_]])
>   }
> }
> ```
>
> then run with
>
> 'spark.kryo.referenceTracking': 'false',
> 'spark.kryo.registrationRequired': 'false',
> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
> 'spark.kryo.unsafe': 'false',
> 'spark.kryoserializer.buffer.max': '256m',
>
> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov 
> wrote:
>
>> Hi folks,
>>
>> Posted this some time ago but the problem continues to bedevil us. I'm
>> including a (slightly edited) stack trace that results from this error. If
>> anyone can shed any light on what exactly is happening here and what we can
>> do to avoid it, that would be much appreciated.
>>
>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>> at 
>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>> at 
>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>> at 
>>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>>> at 
>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>> at 
>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>> at 
>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>> at 
>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>> at 
>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>> at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>> at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>> at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>> at 
>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>> at 
>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>> at 
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>> at 
>>> org.apache.spark.rdd.RDDOpera

Re: intermittent Kryo serialization failures in Spark

2019-09-17 Thread Vadim Semenov
Pre-register your classes:

```
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
kryo.register(Class.forName("[[B")) // byte[][]
kryo.register(classOf[java.lang.Class[_]])
  }
}
```

then run with

'spark.kryo.referenceTracking': 'false',
'spark.kryo.registrationRequired': 'false',
'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
'spark.kryo.unsafe': 'false',
'spark.kryoserializer.buffer.max': '256m',

On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov 
wrote:

> Hi folks,
>
> Posted this some time ago but the problem continues to bedevil us. I'm
> including a (slightly edited) stack trace that results from this error. If
> anyone can shed any light on what exactly is happening here and what we can
> do to avoid it, that would be much appreciated.
>
> org.apache.spark.SparkException: Failed to register classes with Kryo
>>  at 
>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>  at 
>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>  at 
>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>>  at 
>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>  at 
>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>  at 
>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>  at 
>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>  at 
>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>  at 
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>  at 
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>  at 
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>  at 
>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>  at 
>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>  at 
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>  at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>>  at 
>> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>  at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
&

Re: intermittent Kryo serialization failures in Spark

2019-09-17 Thread Jerry Vinokurov
Hi folks,

Posted this some time ago but the problem continues to bedevil us. I'm
including a (slightly edited) stack trace that results from this error. If
anyone can shed any light on what exactly is happening here and what we can
do to avoid it, that would be much appreciated.

org.apache.spark.SparkException: Failed to register classes with Kryo
>   at 
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>   at 
> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>   at 
> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>   at 
> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>   at 
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>   at 
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>   at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>   at 
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>   at 
> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>   at 
> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>   at 
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>   at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at

intermittent Kryo serialization failures in Spark

2019-07-10 Thread Jerry Vinokurov
Hi all,

I am experiencing a strange intermittent failure of my Spark job that
results from serialization issues in Kryo. Here is the stack trace:

Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at 
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>   at 
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>   ... 204 more
>
> (I've edited the company and model name since this is proprietary code)

This error does not surface every time the job is run; I would say it
probably shows up once in every 10 runs or so, and there isn't anything
about the input data that triggers this, as I've been able to
(nondeterministically) reproduce the error by simply rerunning the job with
the same inputs over and over again. The model itself is just a plain Scala
case class whose fields are strings and integers, so there's no custom
serialization logic or anything like that. As I understand, this is seems
related to an issue previously documented here
<https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this was
fixed long ago. I'm running this job on an AWS EMR cluster and have
confirmed that the version of Spark running there is 2.4.0, with the patch
that is linked in the above issue being part of the code.

A suggested solution has been to set the extraClasspath config settings on
the driver and executor, but that has not fixed the problem. I'm out of
ideas for how to tackle this and would love to hear if anyone has any
suggestions or strategies for fixing this.

thanks,
Jerry

-- 
http://www.google.com/profiles/grapesmoker


Question regarding kryo and java encoders in datasets

2019-01-03 Thread Devender Yadav
Hi All,



Good day!


I am using spark 2.4 and referring 
https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

Bean class:

public class EmployeeBean implements Serializable {

private Long id;
private String name;
private Long salary;
private Integer age;

// getters and setters

}


Spark Example:

SparkSession spark = 
SparkSession.builder().master("local[4]").appName("play-with-spark").getOrCreate();

List employees1 = populateEmployees(1, 1_000_000);

Dataset ds1 = spark.createDataset(employees1, 
Encoders.kryo(EmployeeBean.class));
Dataset ds2 = spark.createDataset(employees1, 
Encoders.bean(EmployeeBean.class));

ds1.persist(StorageLevel.MEMORY_ONLY());
long ds1Count = ds1.count();

ds2.persist(StorageLevel.MEMORY_ONLY());
long ds2Count = ds2.count();


I looked for storage in spark Web UI. Useful part -

ID  RDD Name   Size in Memory
2   LocalTableScan [value#0]   56.5 MB
13  LocalTableScan [age#6, id#7L, name#8, salary#9L]   23.3 MB


Few questions:

  *   Shouldn't size of kryo serialized RDD be less than java serialized RDD 
instead of more than double size?

  *   I also tried MEMORY_ONLY_SER() mode and RDD size is the same. RDD as 
serialized Java objects should be stored as one byte array per partition. 
Shouldn't the size of persisted RDDs be less than deserialized ones?

  *   What exactly is adding kyro and bean encoders are doing while creating 
Dataset?

  *   Can I rename persisted RDDs for better readability?



Regards,
Devender








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
<>
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Internal Spark class is not registered by Kryo

2018-10-09 Thread 曹礼俊
Hi all:

I have set spark.kryo.registrationRequired=true, but an exception occured:
java.lang.IllegalArgumentException: Class is not registered:
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I
run the program.

I tried to register it manually by kryo.register() and
Sparkconf.registerKryoClasses, but it still not work.

The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the
same problem I think.

Be grateful if someone can help me..

Best Regards,
Lijun Cao


Internal Spark class is not registered by Kryo

2018-10-09 Thread 曹礼俊
Hi all:

I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 

I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.

The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 

Be grateful if someone can help me..

Best Regards,
Lijun Cao



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Internal Spark class is not registered by Kryo

2018-10-09 Thread BOT
Hi developers:


I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 


I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.


The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 


Be grateful if someone can help me..


Best Regards,
Lijun Cao

Internal Spark class is not registered by Kryo

2018-10-09 Thread BOT
Hi developers:
I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 
I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.
The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 
Be grateful if someone can help me..


Best Regards,
Lijun Cao

Internal Spark class is not registered by Kryo

2018-10-09 Thread Lijun Cao
Hi developers:

I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 

I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.

The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 

Be grateful if someone can help me..

Best Regards,
Lijun Cao

Spark internal class is not registered by Kryo

2018-10-09 Thread Lijun Cao
Hi developers:

I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 

I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.

The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 

Be grateful if someone can help me..

Best Regards,
Lijun Cao

Internal Spark class is not registered by Kryo

2018-10-09 Thread Lijun Cao
Hi developers:

I have set spark.kryo.registrationRequired=true, but an exception occured: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage when I run 
the program. 

I tried to register it manually by kryo.register() and 
Sparkconf.registerKryoClasses, but it still not work.

The related JIRA is SPARK-21569. And SPARK-10251, SPARK-6497 are may the same 
problem I think. 

Be grateful if someone can help me..

Best Regards,
Lijun Cao

How Kryo serializer allocates buffer in Spark

2018-07-10 Thread nirav
I am getting following error in spark task. Default max value is 64mb!
Document says it should be large enough to store largest object in my
application. I don't think I have any object thhhat is bigger then 64mb. SO
what these values (spark.kryoserializer.buffer,
spark.kryoserializer.buffer.max) means?

Is that a buffer per executor or buffer per executor per core ? I have 6
cores per executors so do all 6 are writing to this common buffer?   in
that case I have 16mb buffer per core. Please explain. Thanks!


Job aborted due to stage failure: Task 3 in stage 4.0 failed 10 times, most
recent failure: Lost task 3.9 in stage 4.0 (TID 16,
iadprd01mpr005.mgmt-a.xactly.iad.dc.local): org.apache.spark.SparkException:
*Kryo serialization failed: Buffer overflow. Available: 0, required: 19. To
avoid this, increase spark.kryoserializer.buffer.max value.*
at org.apache.spark.serializer.KryoSerializerInstance.
serialize(KryoSerializer.scala:300)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: Kryo serialization failed: Buffer overflow : Broadcast Join

2018-02-02 Thread Pralabh Kumar
I am using spark 2.1.0

On Fri, Feb 2, 2018 at 5:08 PM, Pralabh Kumar 
wrote:

> Hi
>
> I am performing broadcast join where my small table is 1 gb .  I am
> getting following error .
>
> I am using
>
>
> org.apache.spark.SparkException:
> . Available: 0, required: 28869232. To avoid this, increase
> spark.kryoserializer.buffer.max value
>
>
>
> I increase the value to
>
> spark.conf.set("spark.kryoserializer.buffer.max","2g")
>
>
> But I am still getting the error .
>
> Please help
>
> Thx
>


Kryo serialization failed: Buffer overflow : Broadcast Join

2018-02-02 Thread Pralabh Kumar
Hi

I am performing broadcast join where my small table is 1 gb .  I am getting
following error .

I am using


org.apache.spark.SparkException:
. Available: 0, required: 28869232. To avoid this, increase
spark.kryoserializer.buffer.max value



I increase the value to

spark.conf.set("spark.kryoserializer.buffer.max","2g")


But I am still getting the error .

Please help

Thx


Re: Kryo not registered class

2017-11-20 Thread Vadim Semenov
Try:

Class.forName("[Lorg.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation;")

On Sun, Nov 19, 2017 at 3:24 PM, Angel Francisco Orta <
angel.francisco.o...@gmail.com> wrote:

> Hello, I'm with spark 2.1.0 with scala and I'm registering all classes
> with kryo, and I have a  problem registering this class,
>
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$
> SerializableFileStatus$SerializableBlockLocation[]
>
> I can't register with classOf[Array[Class.forName("org.apache.spark.sql.
> execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$
> SerializableBlockLocation").type]]
>
>
> I have tried as well creating a java class like register and registering
> the class as org.apache.spark.sql.execution.datasources.
> PartitioningAwareFileIndex$SerializableFileStatus$
> SerializableBlockLocation[].class;
>
> Any clue is appreciatted,
>
> Thanks.
>
>


Kryo not registered class

2017-11-19 Thread Angel Francisco Orta
Hello, I'm with spark 2.1.0 with scala and I'm registering all classes with
kryo, and I have a  problem registering this class,

org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation[]

I can't register with
classOf[Array[Class.forName("org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation").type]]


I have tried as well creating a java class like register and registering
the class as
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation[].class;

Any clue is appreciatted,

Thanks.


spark 2.1.1 ml.LogisticRegression with large feature set cause Kryo serialization failed: Buffer overflow

2017-09-17 Thread haibo wu
I try to train a big model.
I have 40 million instances and 50 million feature set, and it is sparse.
I am using 40 executors with 20 GB each + driver with 40 GB. The number of
data partitions is 5000, the treeAggregate depth is 4, the
spark.kryoserializer.buffer.max is 2016m, the spark.driver.maxResultSize is
40G.

The execution fails with the following messages:

+WARN TaskSetManager: Lost task 2.1 in stage 25.0 (TID 1415,
Blackstone064183, executor 15): org.apache.spark.SparkException: Kryo
serialization failed: Buffer overflow. Available: 3, required: 8
Serialization trace:
currMin (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer). To
avoid this, increase spark.kryoserializer.buffer.max value.
at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:315)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:364)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

+

I know that spark.kryoserializer.buffer.max limit 2g and can not increase.
I have already try increasing partition num to 1 and treeAggregate
depth to 200, it still failed with same error message.
And I try use java serializer without kryoserializer, it failed with oom:

WARN TaskSetManager: Lost task 5.0 in stage 32.0 (TID 15701,
Blackstone065188, executor 4): +java.lang.OutOfMemoryError
at
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:364)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)+

Any advice?



-- 
fun coding


Fwd: Saving RDD as Kryo (broken in 2.1)

2017-06-26 Thread Alexander Krasheninnikov
Hi, all!
I have a code, serializing RDD as Kryo, and saving it as sequence file. It
works fine in 1.5.1, but, while switching to 2.1.1 it does not work.

I am trying to serialize RDD of Tuple2<> (got from PairRDD).

   1. RDD consists of different heterogeneous objects (aggregates, like
   HLL, QTree, Min, Max, etc.)
   2. Save is performed within streaming
   3. Read is performed out of streaming (another app)
   4. Supposed, that such error can be due to custom serializers - turned
   them off, but errors still exists
   5. Tried disabling references in Kryo (since I saw an error while
   resolving references) - got StackOverflow, and significant performance
   degradation
   6. Implementing Serializable/Externalizable is not a solution,
   unfortunately.

Expected behavior:

saveAsObjectFile/loadObjectFile are symmetric, and it's possible to load
previously saved RDD.

Code of save/load:

object KryoFile {

  val THREAD_LOCAL_CACHE = new ThreadLocal[Kryo]

  /*
   * Used to write as Object file using kryo serialization
   */
  def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
val kryoSerializer = new KryoSerializer(rdd.context.getConf)

rdd.context.setJobDescription("Saving to path " + path)
rdd.mapPartitions(iter => iter.grouped(10)
  .map(_.toArray))
  .map(splitArray => {
//initializes kyro and calls your registrator class
var kryo = THREAD_LOCAL_CACHE.get()
    if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

//convert data to bytes
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
kryo.reset()

// We are ignoring key field of sequence file
val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
  }).saveAsSequenceFile(path)
  }

  /*
   * Method to read from object file which is saved kryo format.
   */
  def loadObjectFile[T](sc: SparkContext, path: String, minPartitions:
Int = 1)(implicit ct: ClassTag[T]) = {
val kryoSerializer = new KryoSerializer(sc.getConf)

sc.sequenceFile(path, classOf[NullWritable],
classOf[BytesWritable], minPartitions)
  .flatMap(x => {

var kryo = THREAD_LOCAL_CACHE.get()
if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

val input = new Input()
input.setBuffer(x._2.getBytes)
val data = kryo.readClassAndObject(input)
kryo.reset()
val dataObject = data.asInstanceOf[Array[T]]
dataObject
  })

  }
}


When trying to deserialize, I got such errors:
17/06/21 08:19:18 ERROR Executor: Exception in task 14.0 in stage 0.0 (TID
14)
java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
MapReferenceResolver.java:60)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:706)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
ObjectArraySerializer.read(DefaultArraySerializers.java:396)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
ObjectArraySerializer.read(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$
1.apply(KryoFile.scala:75)
at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$
1.apply(KryoFile.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


17/06/21 08:19:18 ERROR Executor: Exception in task 12.0 in stage 0.0 (TID
12)
java.lang.ArrayStoreException: java.util.Collections$EmptyMap
at com.esotericsoftware.kryo.serializer

Saving RDD as Kryo (broken in 2.1)

2017-06-26 Thread Александр Крашенинников
Hi, all!
I have a code, serializing RDD as Kryo, and saving it as sequence file. It
works fine in 1.5.1, but, while switching to 2.1.1 it does not work.

I am trying to serialize RDD of Tuple2<> (got from PairRDD).

   1. RDD consists of different heterogeneous objects (aggregates, like
   HLL, QTree, Min, Max, etc.)
   2. Save is performed within streaming
   3. Read is performed out of streaming (another app)
   4. Supposed, that such error can be due to custom serializers - turned
   them off, but errors still exists
   5. Tried disabling references in Kryo (since I saw an error while
   resolving references) - got StackOverflow, and significant performance
   degradation
   6. Implementing Serializable/Externalizable is not a solution,
   unfortunately.

Expected behavior:

saveAsObjectFile/loadObjectFile are symmetric, and it's possible to load
previously saved RDD.

Code of save/load:

object KryoFile {

  val THREAD_LOCAL_CACHE = new ThreadLocal[Kryo]

  /*
   * Used to write as Object file using kryo serialization
   */
  def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
val kryoSerializer = new KryoSerializer(rdd.context.getConf)

rdd.context.setJobDescription("Saving to path " + path)
rdd.mapPartitions(iter => iter.grouped(10)
  .map(_.toArray))
  .map(splitArray => {
//initializes kyro and calls your registrator class
var kryo = THREAD_LOCAL_CACHE.get()
    if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

//convert data to bytes
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
kryo.reset()

// We are ignoring key field of sequence file
val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
  }).saveAsSequenceFile(path)
  }

  /*
   * Method to read from object file which is saved kryo format.
   */
  def loadObjectFile[T](sc: SparkContext, path: String, minPartitions:
Int = 1)(implicit ct: ClassTag[T]) = {
val kryoSerializer = new KryoSerializer(sc.getConf)

sc.sequenceFile(path, classOf[NullWritable],
classOf[BytesWritable], minPartitions)
  .flatMap(x => {

var kryo = THREAD_LOCAL_CACHE.get()
if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

val input = new Input()
input.setBuffer(x._2.getBytes)
val data = kryo.readClassAndObject(input)
kryo.reset()
val dataObject = data.asInstanceOf[Array[T]]
dataObject
  })

  }
}


When trying to deserialize, I got such errors:
17/06/21 08:19:18 ERROR Executor: Exception in task 14.0 in stage 0.0 (TID
14)
java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
MapReferenceResolver.java:60)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:706)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
ObjectArraySerializer.read(DefaultArraySerializers.java:396)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
ObjectArraySerializer.read(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$
1.apply(KryoFile.scala:75)
at com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$
1.apply(KryoFile.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


17/06/21 08:19:18 ERROR Executor: Exception in task 12.0 in stage 0.0 (TID
12)
java.lang.ArrayStoreException: java.util.Collections$EmptyMap
at com.esotericsoftware.kryo.serializer

Saving RDD as Kryo (broken in 2.1)

2017-06-21 Thread Alexander Krasheninnikov
Hi, all!
I have a code, serializing RDD as Kryo, and saving it as sequence file. It
works fine in 1.5.1, but, while switching to 2.1.1 it does not work.

I am trying to serialize RDD of Tuple2<> (got from PairRDD).

   1. RDD consists of different heterogeneous objects (aggregates, like
   HLL, QTree, Min, Max, etc.)
   2. Save is performed within streaming
   3. Read is performed out of streaming (another app)
   4. Supposed, that such error can be due to custom serializers - turned
   them off, but errors still exists
   5. Tried disabling references in Kryo (since I saw an error while
   resolving references) - got StackOverflow, and significant performance
   degradation
   6. Implementing Serializable/Externalizable is not a solution,
   unfortunately.

Expected behavior:

saveAsObjectFile/loadObjectFile are symmetric, and it's possible to load
previously saved RDD.

Code of save/load:

object KryoFile {

  val THREAD_LOCAL_CACHE = new ThreadLocal[Kryo]

  /*
   * Used to write as Object file using kryo serialization
   */
  def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
val kryoSerializer = new KryoSerializer(rdd.context.getConf)

rdd.context.setJobDescription("Saving to path " + path)
rdd.mapPartitions(iter => iter.grouped(10)
  .map(_.toArray))
  .map(splitArray => {
//initializes kyro and calls your registrator class
var kryo = THREAD_LOCAL_CACHE.get()
    if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

//convert data to bytes
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
kryo.reset()

// We are ignoring key field of sequence file
val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
  }).saveAsSequenceFile(path)
  }

  /*
   * Method to read from object file which is saved kryo format.
   */
  def loadObjectFile[T](sc: SparkContext, path: String, minPartitions:
Int = 1)(implicit ct: ClassTag[T]) = {
val kryoSerializer = new KryoSerializer(sc.getConf)

sc.sequenceFile(path, classOf[NullWritable],
classOf[BytesWritable], minPartitions)
  .flatMap(x => {

var kryo = THREAD_LOCAL_CACHE.get()
if (null == kryo) {
  kryo = kryoSerializer.newKryo()
  THREAD_LOCAL_CACHE.set(kryo)
}

val input = new Input()
input.setBuffer(x._2.getBytes)
val data = kryo.readClassAndObject(input)
kryo.reset()
val dataObject = data.asInstanceOf[Array[T]]
dataObject
  })

  }
}


When trying to deserialize, I got such errors:
17/06/21 08:19:18 ERROR Executor: Exception in task 14.0 in stage 0.0 (TID
14)
java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:706)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at
com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:75)
at
com.badoo.uds.commons.helper.KryoFile$$anonfun$objectFile$1.apply(KryoFile.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


17/06/21 08:19:18 ERROR Executor: Exception in task 12.0 in stage 0.0 (TID
12)
java.lang.ArrayStoreException: java.util.Collections$EmptyMap
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$O

Re: dataset aggregators with kryo encoder very slow

2017-01-21 Thread Koert Kuipers
sorry i meant to say SPARK-18980

On Sat, Jan 21, 2017 at 1:48 AM, Koert Kuipers <ko...@tresata.com> wrote:

> found it :) SPARK-1890
> thanks cloud-fan
>
> On Sat, Jan 21, 2017 at 1:46 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> trying to replicate this in spark itself i can for v2.1.0 but not for
>> master. i guess it has been fixed
>>
>> On Fri, Jan 20, 2017 at 4:57 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> i started printing out when kryo serializes my buffer data structure for
>>> my aggregator.
>>>
>>> i would expect every buffer object to ideally get serialized only once:
>>> at the end of the map-side before the shuffle (so after all the values for
>>> the given key within the partition have been reduced into it). i realize
>>> that in reality due to the order of the elements coming in this can not
>>> always be achieved. but what i see instead is that the buffer is getting
>>> serialized after every call to reduce a value into it, always. could this
>>> be the reason it is so slow?
>>>
>>> On Thu, Jan 19, 2017 at 4:17 PM, Koert Kuipers <ko...@tresata.com>
>>> wrote:
>>>
>>>> we just converted a job from RDD to Dataset. the job does a single
>>>> map-red phase using aggregators. we are seeing very bad performance for the
>>>> Dataset version, about 10x slower.
>>>>
>>>> in the Dataset version we use kryo encoders for some of the
>>>> aggregators. based on some basic profiling of spark in local mode i believe
>>>> the bad performance is due to the kryo encoders. about 70% of time is spend
>>>> in kryo related classes.
>>>>
>>>> since we also use kryo for serialization with the RDD i am surprised
>>>> how big the performance difference is.
>>>>
>>>> has anyone seen the same thing? any suggestions for how to improve this?
>>>>
>>>>
>>>
>>
>


Re: dataset aggregators with kryo encoder very slow

2017-01-20 Thread Koert Kuipers
found it :) SPARK-1890
thanks cloud-fan

On Sat, Jan 21, 2017 at 1:46 AM, Koert Kuipers <ko...@tresata.com> wrote:

> trying to replicate this in spark itself i can for v2.1.0 but not for
> master. i guess it has been fixed
>
> On Fri, Jan 20, 2017 at 4:57 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> i started printing out when kryo serializes my buffer data structure for
>> my aggregator.
>>
>> i would expect every buffer object to ideally get serialized only once:
>> at the end of the map-side before the shuffle (so after all the values for
>> the given key within the partition have been reduced into it). i realize
>> that in reality due to the order of the elements coming in this can not
>> always be achieved. but what i see instead is that the buffer is getting
>> serialized after every call to reduce a value into it, always. could this
>> be the reason it is so slow?
>>
>> On Thu, Jan 19, 2017 at 4:17 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> we just converted a job from RDD to Dataset. the job does a single
>>> map-red phase using aggregators. we are seeing very bad performance for the
>>> Dataset version, about 10x slower.
>>>
>>> in the Dataset version we use kryo encoders for some of the aggregators.
>>> based on some basic profiling of spark in local mode i believe the bad
>>> performance is due to the kryo encoders. about 70% of time is spend in kryo
>>> related classes.
>>>
>>> since we also use kryo for serialization with the RDD i am surprised how
>>> big the performance difference is.
>>>
>>> has anyone seen the same thing? any suggestions for how to improve this?
>>>
>>>
>>
>


Re: dataset aggregators with kryo encoder very slow

2017-01-20 Thread Koert Kuipers
trying to replicate this in spark itself i can for v2.1.0 but not for
master. i guess it has been fixed

On Fri, Jan 20, 2017 at 4:57 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i started printing out when kryo serializes my buffer data structure for
> my aggregator.
>
> i would expect every buffer object to ideally get serialized only once: at
> the end of the map-side before the shuffle (so after all the values for the
> given key within the partition have been reduced into it). i realize that
> in reality due to the order of the elements coming in this can not always
> be achieved. but what i see instead is that the buffer is getting
> serialized after every call to reduce a value into it, always. could this
> be the reason it is so slow?
>
> On Thu, Jan 19, 2017 at 4:17 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> we just converted a job from RDD to Dataset. the job does a single
>> map-red phase using aggregators. we are seeing very bad performance for the
>> Dataset version, about 10x slower.
>>
>> in the Dataset version we use kryo encoders for some of the aggregators.
>> based on some basic profiling of spark in local mode i believe the bad
>> performance is due to the kryo encoders. about 70% of time is spend in kryo
>> related classes.
>>
>> since we also use kryo for serialization with the RDD i am surprised how
>> big the performance difference is.
>>
>> has anyone seen the same thing? any suggestions for how to improve this?
>>
>>
>


Kryo (with Spark 1.6.3) class registration slows down processing

2017-01-20 Thread N B
Hello,

Here is something I am unable to explain and goes against Kryo's
documentation, numerous suggestions on the web and on this list as well as
pure intuition.

Our Spark application runs in a single JVM (perhaps this is relevant, hence
mentioning it). We have been using Kryo serialization with Spark (setting
the spark.serializer property to
org.apache.spark.serializer.KryoSerializer) without explicitly registering
classes and everything seems to work well enough. Recently, I have been
looking into making some performance improvements and decided to register
classes.

I turned on the "spark.kryo.registrationRequired" property and started to
register all classes as they were reported by the resulting Exceptions.
Eventually I managed to register them all. BTW, there is a fairly large
number of internal Spark and Scala classes that also I had to register but
that's besides the point here.

I was hoping to gain some performance improvement as per the suggestions of
registering classes. However, what I saw was the exact opposite and
surprising. Performance (throughput) actually deteriorated by at least a
factor of 50%. I turned off the registrationRequired property but kept the
explicit registrations in place with the same result. Then I reduced the
number of registrations and performance started to get better again.
Eventually I got rid of all the explicit registrations (back to where I
started basically) and performance improved back to where it was.

I am unable to explain why I am observing this behavior as this is
counter-intuitive. Explicit registration is supposed to write smaller
amount of data (class names as Strings vs just class Ids as integers) and
hence help improve performance. Is the fact that Spark is running in local
mode (single JVM) a factor here? Any insights will be appreciated.

Thanks
NB


Re: dataset aggregators with kryo encoder very slow

2017-01-20 Thread Koert Kuipers
i started printing out when kryo serializes my buffer data structure for my
aggregator.

i would expect every buffer object to ideally get serialized only once: at
the end of the map-side before the shuffle (so after all the values for the
given key within the partition have been reduced into it). i realize that
in reality due to the order of the elements coming in this can not always
be achieved. but what i see instead is that the buffer is getting
serialized after every call to reduce a value into it, always. could this
be the reason it is so slow?

On Thu, Jan 19, 2017 at 4:17 PM, Koert Kuipers <ko...@tresata.com> wrote:

> we just converted a job from RDD to Dataset. the job does a single map-red
> phase using aggregators. we are seeing very bad performance for the Dataset
> version, about 10x slower.
>
> in the Dataset version we use kryo encoders for some of the aggregators.
> based on some basic profiling of spark in local mode i believe the bad
> performance is due to the kryo encoders. about 70% of time is spend in kryo
> related classes.
>
> since we also use kryo for serialization with the RDD i am surprised how
> big the performance difference is.
>
> has anyone seen the same thing? any suggestions for how to improve this?
>
>


dataset aggregators with kryo encoder very slow

2017-01-19 Thread Koert Kuipers
we just converted a job from RDD to Dataset. the job does a single map-red
phase using aggregators. we are seeing very bad performance for the Dataset
version, about 10x slower.

in the Dataset version we use kryo encoders for some of the aggregators.
based on some basic profiling of spark in local mode i believe the bad
performance is due to the kryo encoders. about 70% of time is spend in kryo
related classes.

since we also use kryo for serialization with the RDD i am surprised how
big the performance difference is.

has anyone seen the same thing? any suggestions for how to improve this?


Re: Kryo On Spark 1.6.0

2017-01-14 Thread Yan Facai
For scala, you could fix it by using:
conf.registerKryoClasses(Array(Class.forName("scala.collection.mutable.
WrappedArray$ofRef")))


By the way,
if the class is array of primitive class of Java, say byte[], then to use:
Class.forName("[B")

if it is array of other class, say scala.collection.mutable.WrappedArray$ofRef,
then to use:
Class.forName("[Lscala.collection.mutable.WrappedArray$ofRef")

ref:
https://docs.oracle.com/javase/8/docs/api/java/lang/Class.html#getName--





On Tue, Jan 10, 2017 at 11:11 PM, Yang Cao <cybea...@gmail.com> wrote:

> If you don’t mind, could please share me with the scala solution? I tried
> to use kryo but seamed not work at all. I hope to get some practical
> example. THX
>
> On 2017年1月10日, at 19:10, Enrico DUrso <enrico.du...@everis.com> wrote:
>
> Hi,
>
> I am trying to use Kryo on Spark 1.6.0.
> I am able to register my own classes and it works, but when I set
> “spark.kryo.registrationRequired “ to true, I get an error about a scala
> class:
> “Class is not registered: scala.collection.mutable.WrappedArray$ofRef”.
>
> Any of you has already solved this issue in Java? I found the code to
> solve it in Scala, but unable to register this class in Java.
>
> Cheers,
>
> enrico
>
> --
>
> CONFIDENTIALITY WARNING.
> This message and the information contained in or attached to it are
> private and confidential and intended exclusively for the addressee. everis
> informs to whom it may receive it in error that it contains privileged
> information and its use, copy, reproduction or distribution is prohibited.
> If you are not an intended recipient of this E-mail, please notify the
> sender, delete it and do not read, act upon, print, disclose, copy, retain
> or redistribute any portion of this E-mail.
>
>
>


Re: Sporadic ClassNotFoundException with Kryo

2017-01-12 Thread Nirmal Fernando
I faced a similar issue and had to do two things;

1. Submit Kryo jar with the spark-submit
2. Set spark.executor.userClassPathFirst true in Spark conf

On Fri, Nov 18, 2016 at 7:39 PM, chrism <christopher.martens...@cics.se>
wrote:

> Regardless of the different ways we have tried deploying a jar together
> with
> Spark, when running a Spark Streaming job with Kryo as serializer on top of
> Mesos, we sporadically get the following error (I have truncated a bit):
>
> /16/11/18 08:39:10 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
> java.lang.RuntimeException: org.apache.spark.SparkException: Failed to
> register classes with Kryo
>   at
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSeria
> lizer.scala:129)
>   at
> org.apache.spark.serializer.KryoSerializerInstance.borrowKry
> o(KryoSerializer.scala:274)
> ...
>   at
> org.apache.spark.serializer.SerializerManager.dataSerializeS
> tream(SerializerManager.scala:125)
>   at
> org.apache.spark.storage.BlockManager$$anonfun$dropFromMemor
> y$3.apply(BlockManager.scala:1265)
>   at
> org.apache.spark.storage.BlockManager$$anonfun$dropFromMemor
> y$3.apply(BlockManager.scala:1261)
> ...
> Caused by: java.lang.ClassNotFoundException: cics.udr.compound_ran_udr
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)/
>
> where "cics.udr.compound_ran_udr" is a class provided by us in a jar.
>
> We know that the jar containing "cics.udr.compound_ran_udr" is being
> deployed and works because it is listed in the "Environment" tab in the
> GUI,
> and calculations using this class succeed.
>
> We have tried the following methods of deploying the jar containing the
> class
>  * Through --jars in spark-submit
>  * Through SparkConf.setJar
>  * Through spark.driver.extraClassPath and spark.executor.extraClassPath
>  * By having it as the main jar used by spark-submit
> with no luck. The logs (see attached) recognize that the jar is being added
> to the classloader.
>
> We have tried registering the class using
>  * SparkConf.registerKryoClasses.
>  * spark.kryo.classesToRegister
> with no luck.
>
> We are running on Mesos and the jar has been deployed on every machine on
> the local file system in the same location.
>
> I would be very grateful for any help or ideas :)
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Sporadic-ClassNotFoundException-with-K
> ryo-tp28104.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 

Thanks & regards,
Nirmal

Associate Technical Lead - Data Technologies Team, WSO2 Inc.
Mobile: +94715779733 <+94%2071%20577%209733>
Blog: http://nirmalfdo.blogspot.com/


Kryo and Spark 1.6.0 - Does it require a default empty constructor?

2017-01-11 Thread Enrico DUrso
Hi,

I have a doubt about Kryo and Spark 1.6.0.
I read that for using Kryo, the class that you want to serialize must have a 
default constructor.
I created a simple class avoiding to insert such a constructor and If I try to 
serialize manually, it does not work.
But If I use that class in Spark and then I collect for forcing serialization 
it works.
Any idea?

Cheers



CONFIDENTIALITY WARNING.
This message and the information contained in or attached to it are private and 
confidential and intended exclusively for the addressee. everis informs to whom 
it may receive it in error that it contains privileged information and its use, 
copy, reproduction or distribution is prohibited. If you are not an intended 
recipient of this E-mail, please notify the sender, delete it and do not read, 
act upon, print, disclose, copy, retain or redistribute any portion of this 
E-mail.


RE: Kryo On Spark 1.6.0 [Solution in this email]

2017-01-11 Thread Enrico DUrso
Yes sure,

you can find it here: 
http://stackoverflow.com/questions/34736587/kryo-serializer-causing-exception-on-underlying-scala-class-wrappedarray
hope it works, I did not try, I am using Java.
To be precise I found the solution for my problem:
To sum up, I had problems in registering the following class in Java:
“scala.collection.mutable.WrappedArray$ofRef”
The tip is:
Class a = Class.forName(“scala.collection.mutable.WrappedArray$ofRef”)
and then put a in the array of classes you are passing to the method 
registerKryoClasses()

From: Yang Cao [mailto:cybea...@gmail.com]
Sent: 10 January 2017 15:12
To: Enrico DUrso
Cc: user@spark.apache.org
Subject: Re: Kryo On Spark 1.6.0

If you don’t mind, could please share me with the scala solution? I tried to 
use kryo but seamed not work at all. I hope to get some practical example. THX
On 2017年1月10日, at 19:10, Enrico DUrso 
<enrico.du...@everis.com<mailto:enrico.du...@everis.com>> wrote:

Hi,

I am trying to use Kryo on Spark 1.6.0.
I am able to register my own classes and it works, but when I set 
“spark.kryo.registrationRequired “ to true, I get an error about a scala class:
“Class is not registered: scala.collection.mutable.WrappedArray$ofRef”.

Any of you has already solved this issue in Java? I found the code to solve it 
in Scala, but unable to register this class in Java.

Cheers,

enrico



CONFIDENTIALITY WARNING.
This message and the information contained in or attached to it are private and 
confidential and intended exclusively for the addressee. everis informs to whom 
it may receive it in error that it contains privileged information and its use, 
copy, reproduction or distribution is prohibited. If you are not an intended 
recipient of this E-mail, please notify the sender, delete it and do not read, 
act upon, print, disclose, copy, retain or redistribute any portion of this 
E-mail.




CONFIDENTIALITY WARNING.
This message and the information contained in or attached to it are private and 
confidential and intended exclusively for the addressee. everis informs to whom 
it may receive it in error that it contains privileged information and its use, 
copy, reproduction or distribution is prohibited. If you are not an intended 
recipient of this E-mail, please notify the sender, delete it and do not read, 
act upon, print, disclose, copy, retain or redistribute any portion of this 
E-mail.


Re: Kryo On Spark 1.6.0

2017-01-10 Thread Yang Cao
If you don’t mind, could please share me with the scala solution? I tried to 
use kryo but seamed not work at all. I hope to get some practical example. THX
> On 2017年1月10日, at 19:10, Enrico DUrso <enrico.du...@everis.com> wrote:
> 
> Hi,
> 
> I am trying to use Kryo on Spark 1.6.0.
> I am able to register my own classes and it works, but when I set 
> “spark.kryo.registrationRequired “ to true, I get an error about a scala 
> class:
> “Class is not registered: scala.collection.mutable.WrappedArray$ofRef”.
> 
> Any of you has already solved this issue in Java? I found the code to solve 
> it in Scala, but unable to register this class in Java.
> 
> Cheers,
> 
> enrico
> 
> 
> CONFIDENTIALITY WARNING.
> This message and the information contained in or attached to it are private 
> and confidential and intended exclusively for the addressee. everis informs 
> to whom it may receive it in error that it contains privileged information 
> and its use, copy, reproduction or distribution is prohibited. If you are not 
> an intended recipient of this E-mail, please notify the sender, delete it and 
> do not read, act upon, print, disclose, copy, retain or redistribute any 
> portion of this E-mail.



RE: Kryo On Spark 1.6.0

2017-01-10 Thread Enrico DUrso
Hi,

I agree with you Richard.
The point is that, looks like some classes which are used internally by Spark 
are not registered (for instance, the one I mentioned in the previous email is 
something I am not
directly using).
For those classes the serialization performance will be poor in according to 
how Spark works.
How can I register all those classes?

cheers,

From: Richard Startin [mailto:richardstar...@outlook.com]
Sent: 10 January 2017 11:18
To: Enrico DUrso; user@spark.apache.org
Subject: Re: Kryo On Spark 1.6.0


Hi Enrico,



Only set spark.kryo.registrationRequired if you want to forbid any classes you 
have not explicitly registered - see 
http://spark.apache.org/docs/latest/configuration.html.
Configuration - Spark 2.0.2 
Documentation<http://spark.apache.org/docs/latest/configuration.html>
spark.apache.org
Spark Configuration. Spark Properties. Dynamically Loading Spark Properties; 
Viewing Spark Properties; Available Properties. Application Properties; Runtime 
Environment

To enable kryo, you just need 
spark.serializer=org.apache.spark.serializer.KryoSerializer. There is some info 
here - http://spark.apache.org/docs/latest/tuning.html

Cheers,
Richard



https://richardstartin.com/


From: Enrico DUrso <enrico.du...@everis.com<mailto:enrico.du...@everis.com>>
Sent: 10 January 2017 11:10
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Kryo On Spark 1.6.0


Hi,

I am trying to use Kryo on Spark 1.6.0.
I am able to register my own classes and it works, but when I set 
"spark.kryo.registrationRequired " to true, I get an error about a scala class:
"Class is not registered: scala.collection.mutable.WrappedArray$ofRef".

Any of you has already solved this issue in Java? I found the code to solve it 
in Scala, but unable to register this class in Java.

Cheers,

enrico



CONFIDENTIALITY WARNING.
This message and the information contained in or attached to it are private and 
confidential and intended exclusively for the addressee. everis informs to whom 
it may receive it in error that it contains privileged information and its use, 
copy, reproduction or distribution is prohibited. If you are not an intended 
recipient of this E-mail, please notify the sender, delete it and do not read, 
act upon, print, disclose, copy, retain or redistribute any portion of this 
E-mail.



CONFIDENTIALITY WARNING.
This message and the information contained in or attached to it are private and 
confidential and intended exclusively for the addressee. everis informs to whom 
it may receive it in error that it contains privileged information and its use, 
copy, reproduction or distribution is prohibited. If you are not an intended 
recipient of this E-mail, please notify the sender, delete it and do not read, 
act upon, print, disclose, copy, retain or redistribute any portion of this 
E-mail.


Re: Kryo On Spark 1.6.0

2017-01-10 Thread Richard Startin
Hi Enrico,


Only set spark.kryo.registrationRequired if you want to forbid any classes you 
have not explicitly registered - see 
http://spark.apache.org/docs/latest/configuration.html.

Configuration - Spark 2.0.2 
Documentation<http://spark.apache.org/docs/latest/configuration.html>
spark.apache.org
Spark Configuration. Spark Properties. Dynamically Loading Spark Properties; 
Viewing Spark Properties; Available Properties. Application Properties; Runtime 
Environment

To enable kryo, you just need 
spark.serializer=org.apache.spark.serializer.KryoSerializer. There is some info 
here - http://spark.apache.org/docs/latest/tuning.html

<http://spark.apache.org/docs/latest/tuning.html>Cheers,
Richard



https://richardstartin.com/



From: Enrico DUrso <enrico.du...@everis.com>
Sent: 10 January 2017 11:10
To: user@spark.apache.org
Subject: Kryo On Spark 1.6.0


Hi,

I am trying to use Kryo on Spark 1.6.0.
I am able to register my own classes and it works, but when I set 
"spark.kryo.registrationRequired " to true, I get an error about a scala class:
"Class is not registered: scala.collection.mutable.WrappedArray$ofRef".

Any of you has already solved this issue in Java? I found the code to solve it 
in Scala, but unable to register this class in Java.

Cheers,

enrico



CONFIDENTIALITY WARNING.
This message and the information contained in or attached to it are private and 
confidential and intended exclusively for the addressee. everis informs to whom 
it may receive it in error that it contains privileged information and its use, 
copy, reproduction or distribution is prohibited. If you are not an intended 
recipient of this E-mail, please notify the sender, delete it and do not read, 
act upon, print, disclose, copy, retain or redistribute any portion of this 
E-mail.


Kryo On Spark 1.6.0

2017-01-10 Thread Enrico DUrso
Hi,

I am trying to use Kryo on Spark 1.6.0.
I am able to register my own classes and it works, but when I set 
"spark.kryo.registrationRequired " to true, I get an error about a scala class:
"Class is not registered: scala.collection.mutable.WrappedArray$ofRef".

Any of you has already solved this issue in Java? I found the code to solve it 
in Scala, but unable to register this class in Java.

Cheers,

enrico



CONFIDENTIALITY WARNING.
This message and the information contained in or attached to it are private and 
confidential and intended exclusively for the addressee. everis informs to whom 
it may receive it in error that it contains privileged information and its use, 
copy, reproduction or distribution is prohibited. If you are not an intended 
recipient of this E-mail, please notify the sender, delete it and do not read, 
act upon, print, disclose, copy, retain or redistribute any portion of this 
E-mail.


Re: Spark kryo serialization register Datatype[]

2016-12-21 Thread Georg Heiler
I already set

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

to enable kryo and

.set("spark.kryo.registrationRequired", "true")

to force kryo. Strangely, I see the issue of this missing Dataset[]

Trying to register regular classes like Date

.registerKryoClasses(Array(classOf[Date]))

works just fine. but registering the spark internal Dataset[] is not
working / as far as I read the docs should be handled by spark.


Vadim Semenov <vadim.seme...@datadoghq.com> schrieb am Mi., 21. Dez. 2016
um 17:12 Uhr:

> to enable kryo serializer you just need to pass
> `spark.serializer=org.apache.spark.serializer.KryoSerializer`
>
> the `spark.kryo.registrationRequired` controls the following behavior:
>
> Whether to require registration with Kryo. If set to 'true', Kryo will
> throw an exception if an unregistered class is serialized. If set to false
> (the default), Kryo will write unregistered class names along with each
> object. Writing class names can cause significant performance overhead, so
> enabling this option can enforce strictly that a user has not omitted
> classes from registration.
>
>
> as described here http://spark.apache.org/docs/latest/configuration.html
>
> if it's set to `true` you need to manually register classes as described
> here: http://spark.apache.org/docs/latest/tuning.html#data-serialization
>
>
> On Wed, Dec 21, 2016 at 8:49 AM, geoHeil <georg.kf.hei...@gmail.com>
> wrote:
>
> To force spark to use kryo serialization I set
> spark.kryo.registrationRequired to true.
>
> Now spark complains that: Class is not registered:
> org.apache.spark.sql.types.DataType[] is not registered.
> How can I fix this? So far I could not successfully register this class.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-kryo-serialization-register-Datatype-tp28243.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: Spark kryo serialization register Datatype[]

2016-12-21 Thread Vadim Semenov
to enable kryo serializer you just need to pass
`spark.serializer=org.apache.spark.serializer.KryoSerializer`

the `spark.kryo.registrationRequired` controls the following behavior:

Whether to require registration with Kryo. If set to 'true', Kryo will
> throw an exception if an unregistered class is serialized. If set to false
> (the default), Kryo will write unregistered class names along with each
> object. Writing class names can cause significant performance overhead, so
> enabling this option can enforce strictly that a user has not omitted
> classes from registration.


as described here http://spark.apache.org/docs/latest/configuration.html

if it's set to `true` you need to manually register classes as described
here: http://spark.apache.org/docs/latest/tuning.html#data-serialization


On Wed, Dec 21, 2016 at 8:49 AM, geoHeil <georg.kf.hei...@gmail.com> wrote:

> To force spark to use kryo serialization I set
> spark.kryo.registrationRequired to true.
>
> Now spark complains that: Class is not registered:
> org.apache.spark.sql.types.DataType[] is not registered.
> How can I fix this? So far I could not successfully register this class.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-kryo-serialization-register-
> Datatype-tp28243.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark kryo serialization register Datatype[]

2016-12-21 Thread geoHeil
To force spark to use kryo serialization I set
spark.kryo.registrationRequired to true.

Now spark complains that: Class is not registered:
org.apache.spark.sql.types.DataType[] is not registered.
How can I fix this? So far I could not successfully register this class.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-kryo-serialization-register-Datatype-tp28243.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kryo Exception: NegativeArraySizeException

2016-11-24 Thread Pedro Tuero
Hi, I'm trying to broadcast a map of 2.6GB but I'm getting a weird Kryo
exception.

I tried to set -XX:hashCode=0 in executor and driver, following this
copmment:
https://github.com/broadinstitute/gatk/issues/1524#issuecomment-189368808
But it didn't change anything.

Are you aware of this problem?
Is there a workaround?

Thank for yuor comments,
Pedro.

Map info:
 INFO 2016-11-24 15:29:34,230 [main] (Logging.scala:54) - Block broadcast_3
stored as values in memory (estimated size 2.6 GB, free 5.7 GB)

Error Trace:
ERROR ApplicationMaster: User class threw exception:
com.esotericsoftware.kryo.KryoException:
java.lang.NegativeArraySizeException
Serialization trace:
...
com.esotericsoftware.kryo.KryoException:
java.lang.NegativeArraySizeException
Serialization trace:
...
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:113)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:195)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:236)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107)
at
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387)
at
org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:646)
at com.personal.sparkJob.main(sparkJob..java:81)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
Caused by: java.lang.NegativeArraySizeException
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:447)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:245)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:246)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:239)
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:135)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:41)
at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:658)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:623)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
... 22 more


Sporadic ClassNotFoundException with Kryo

2016-11-18 Thread chrism
Regardless of the different ways we have tried deploying a jar together with
Spark, when running a Spark Streaming job with Kryo as serializer on top of
Mesos, we sporadically get the following error (I have truncated a bit):

/16/11/18 08:39:10 ERROR OneForOneBlockFetcher: Failed while starting block
fetches
java.lang.RuntimeException: org.apache.spark.SparkException: Failed to
register classes with Kryo
  at
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:129)
  at
org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:274)
...
  at
org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:125)
  at
org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1265)
  at
org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1261)
...
Caused by: java.lang.ClassNotFoundException: cics.udr.compound_ran_udr
  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)/

where "cics.udr.compound_ran_udr" is a class provided by us in a jar.

We know that the jar containing "cics.udr.compound_ran_udr" is being
deployed and works because it is listed in the "Environment" tab in the GUI,
and calculations using this class succeed.

We have tried the following methods of deploying the jar containing the
class
 * Through --jars in spark-submit
 * Through SparkConf.setJar
 * Through spark.driver.extraClassPath and spark.executor.extraClassPath
 * By having it as the main jar used by spark-submit
with no luck. The logs (see attached) recognize that the jar is being added
to the classloader.

We have tried registering the class using
 * SparkConf.registerKryoClasses.
 * spark.kryo.classesToRegister
with no luck.

We are running on Mesos and the jar has been deployed on every machine on
the local file system in the same location.

I would be very grateful for any help or ideas :)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sporadic-ClassNotFoundException-with-Kryo-tp28104.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



BiMap BroadCast Variable - Kryo Serialization Issue

2016-11-02 Thread Kalpana Jalawadi
Hi,

I am getting Nullpointer exception due to Kryo Serialization issue, while
trying to read a BiMap broadcast variable. Attached is the code snippets.
Pointers shared here didn't help - link1
<http://stackoverflow.com/questions/33156095/spark-serialization-issue-with-hashmap>,
link2
<http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist>.
Spark version used is 1.6.x, but this was working with 1.3.x version.

Any help in this regard is much appreciated.

Exception:

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
App > Serialization trace:
App > value (com.demo.BiMapWrapper)
App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1238)
App > at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
App > at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
App > at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
App > at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
App > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
App > at
com.manthan.aaas.algo.associationmining.impl.Test.lambda$execute$6abf5fd0$1(Test.java:39)
App > at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
App > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
App > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
App > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
App > at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
App > at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
App > at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
App > at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:273)
App > at scala.collection.AbstractIterator.to(Iterator.scala:1157)
App > at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
App > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
App > at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
App > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
App > at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
App > at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
App > at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978)
App > at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978)
App > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
App > at org.apache.spark.scheduler.Task.run(Task.scala:89)
App > at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
App > at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
App > at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
App > at java.lang.Thread.run(Thread.java:745)
App > Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
App > Serialization trace:
App > value (com.demo.BiMapWrapper)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
App > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
App > at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
App > at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
App > at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1231)
App > ... 29 more
App > Caused by: java.lang.NullPointerException
App > at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180)
App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:230)
App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:218)
App > at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
App > at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
App > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
App > ... 35 more
App >
App > 16/11/02 18:39:01 dispatcher-event-loop-2 INFO TaskSetManager:
Starting task 17.1 in stage 1.0 (TID 19, ip-10-0-1-237.ec2.internal,
partition 17,PROCESS_LOCAL, 2076 bytes)
App > 16/11/02 18:39:01 task-result-getter-3 INFO TaskSetManager: Lost task
17.1 in stage

Kryo on Zeppelin

2016-10-10 Thread Fei Hu
Hi All,

I am running some spark scala code on zeppelin on CDH 5.5.1 (Spark version
1.5.0). I customized the Spark interpreter to use org.apache.spark.
serializer.KryoSerializer as spark.serializer. And in the dependency I
added Kyro-3.0.3 as following:
 com.esotericsoftware:kryo:3.0.3


When I wrote the scala notebook and run the program, I got the following
errors. But If I compiled these code as jars, and use spark-submit to run
it on the cluster, it worked well without errors.

WARN [2016-10-10 23:43:40,801] ({task-result-getter-1}
Logging.scala[logWarning]:71) - Lost task 0.0 in stage 3.0 (TID 9,
svr-A3-A-U20): java.io.EOFException

at org.apache.spark.serializer.KryoDeserializationStream.
readObject(KryoSerializer.scala:196)

at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(
TorrentBroadcast.scala:217)

at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$
readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175)

at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(
TorrentBroadcast.scala:165)

at org.apache.spark.broadcast.TorrentBroadcast._value$
lzycompute(TorrentBroadcast.scala:64)

at org.apache.spark.broadcast.TorrentBroadcast._value(
TorrentBroadcast.scala:64)

at org.apache.spark.broadcast.TorrentBroadcast.getValue(
TorrentBroadcast.scala:88)

at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:88)

at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:214)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


There were also some errors when I run the Zeppelin Tutorial:

Caused by: java.io.IOException: java.lang.NullPointerException

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)

at org.apache.spark.rdd.ParallelCollectionPartition.readObject(
ParallelCollectionRDD.scala:70)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at java.io.ObjectStreamClass.invokeReadObject(
ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(
ObjectInputStream.java:1900)

at java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1351)

at java.io.ObjectInputStream.defaultReadFields(
ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(
ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at org.apache.spark.serializer.JavaDeserializationStream.
readObject(JavaSerializer.scala:72)

at org.apache.spark.serializer.JavaSerializerInstance.
deserialize(JavaSerializer.scala:98)

at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:194)

... 3 more

Caused by: java.lang.NullPointerException

at com.twitter.chill.WrappedArraySerializer.read(
WrappedArraySerializer.scala:38)

at com.twitter.chill.WrappedArraySerializer.read(
WrappedArraySerializer.scala:23)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at org.apache.spark.serializer.KryoDeserializationStream.
readObject(KryoSerializer.scala:192)

at org.apache.spark.rdd.ParallelCollectionPartition$$
anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(
ParallelCollectionRDD.scala:80)

at org.apache.spark.rdd.ParallelCollectionPartition$$
anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(
ParallelCollectionRDD.scala:80)

at org.apache.spark.util.Utils$.deserializeViaNestedStream(
Utils.scala:142)

at org.apache.spark.rdd.ParallelCollectionPartition$$
anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:80)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)

Is there anyone knowing why it happended?

Thanks in advance,
Fei


Re: mutable.LinkedHashMap kryo serialization issues

2016-08-26 Thread Rahul Palamuttam
Hi,

I apologize, I spoke too soon.
Those transient member variables may not be the issue.

To clarify my test case I am creating a LinkedHashMap with two elements in
a map expression on an RDD.
Note that the LinkedHashMaps are being created on the worker JVMs (not the
driver JVM) and THEN collected to the driver JVM.
I am NOT creating LinkedHashMaps on the driver and then parallelizing them
(sending them to worker JVMs).

As Renato said spark requires us to register classes that aren't yet in
Chill.
As far as I know there are three ways to register and it's through api
calls on sparkConf.

1. sparkConf().registerKryoClasses(Array(classOf[...], clasOf[...]))
* This is the method of registering classes as described in the Tuning page:
http://spark.apache.org/docs/latest/tuning.html#data-serialization

2. sparkConf().set("spark.kryo.classesToRegister", "cName1, cName2")

3. sparkConf().set("spark.kryo.registrator", "registrator1, registrator2")

In the first two methods, which set the classes to register in Kryo,
what I get are empty mutable.LinkedHashMaps after calling collect on the
RDD.
To my best understanding this should not happen (none of the other
collection classes I have used have this problem).

For the third method I created a registrator for mutable.LinkedHashMap
which can be found here :
https://gist.github.com/rahulpalamuttam/9f3bfa39a160efa80844d3a7a7bd87cd

I set the registrator like so :
sparkConf().set("spark.kryo.registrator",
"org.dia.core.MutableLinkedHashMapRegistrator").
Now, when I do the same test, I get an Array of LinkedHashMaps.
Each LinkedHashMap contains the entries I populated it with in the map task.

Why do the first two methods result in improper serialization of
mutable.LinkedHashMap?
Should I file a JIRA for it?

Much credit should be given to Martin Grotzke from EsotericSoftware/kryo
who helped me tremendously.

Best,

Rahul Palamuttam




On Fri, Aug 26, 2016 at 10:16 AM, Rahul Palamuttam <rahulpala...@gmail.com>
wrote:

> Thanks Renato.
>
> I forgot to reply all last time. I apologize for the rather confusing
> example.
> All that the snipet code did was
> 1. Make an RDD of LinkedHashMaps with size 2
> 2. On the worker side get the sizes of the HashMaps (via a map(hash =>
> hash.size))
> 3. On the driver call collect on the RDD[Ints] which is the RDD of hashmap
> sizes giving you an Array[Ints]
> 4. On the driver call collect on the RDD[LinkedHashMap] giving you an
> Array[LinkedHashMap]
> 5. Check the size of a hashmap in Array[LinkedHashMap] with any size value
> in Array[Ints] (they're all going to be the same size).
> 6. The sizes differ because the elements of the LinkedHashMap were never
> copied over
>
> Anyway I think I've tracked down the issue and it doesn't seem to be a
> spark or kryo issue.
>
> For those it concerns LinkedHashMap has this serialization issue because
> it has transient members for firstEntry and lastEntry.
> Take a look here : https://github.com/scala/scala/blob/v2.11.8/src/
> library/scala/collection/mutable/LinkedHashMap.scala#L62
>
> Those attributes are not going to be serialized.
> Furthermore, the iterator on LinkedHashMap depends on the firstEntry
> variable
> Since that member is not serialized it is null.
> The iterator requires the firstEntry variable to walk the LinkedHashMap
> https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/
> mutable/LinkedHashMap.scala#L94-L100
>
> I wonder why these two variables were made transient.
>
> Best,
> Rahul Palamuttam
>
>
> On Thu, Aug 25, 2016 at 11:13 PM, Renato Marroquín Mogrovejo <
> renatoj.marroq...@gmail.com> wrote:
>
>> Hi Rahul,
>>
>> You have probably already figured this one out, but anyway...
>> You need to register the classes that you'll be using with Kryo because
>> it does not support all Serializable types and requires you to register the
>> classes you’ll use in the program in advance. So when you don't register
>> the class, Kryo doesn't know how to serialize/deserialize it.
>>
>>
>> Best,
>>
>> Renato M.
>>
>> 2016-08-22 17:12 GMT+02:00 Rahul Palamuttam <rahulpala...@gmail.com>:
>>
>>> Hi,
>>>
>>> Just sending this again to see if others have had this issue.
>>>
>>> I recently switched to using kryo serialization and I've been running
>>> into errors
>>> with the mutable.LinkedHashMap class.
>>>
>>> If I don't register the mutable.LinkedHashMap class then I get an
>>> ArrayStoreException seen below.
>>> If I do register the class, then when the LinkedHashMap is collected on
>>> the driver, it does not contain any elements.
>>>
>>> H

Re: mutable.LinkedHashMap kryo serialization issues

2016-08-26 Thread Rahul Palamuttam
Thanks Renato.

I forgot to reply all last time. I apologize for the rather confusing
example.
All that the snipet code did was
1. Make an RDD of LinkedHashMaps with size 2
2. On the worker side get the sizes of the HashMaps (via a map(hash =>
hash.size))
3. On the driver call collect on the RDD[Ints] which is the RDD of hashmap
sizes giving you an Array[Ints]
4. On the driver call collect on the RDD[LinkedHashMap] giving you an
Array[LinkedHashMap]
5. Check the size of a hashmap in Array[LinkedHashMap] with any size value
in Array[Ints] (they're all going to be the same size).
6. The sizes differ because the elements of the LinkedHashMap were never
copied over

Anyway I think I've tracked down the issue and it doesn't seem to be a
spark or kryo issue.

For those it concerns LinkedHashMap has this serialization issue because it
has transient members for firstEntry and lastEntry.
Take a look here :
https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/mutable/LinkedHashMap.scala#L62

Those attributes are not going to be serialized.
Furthermore, the iterator on LinkedHashMap depends on the firstEntry
variable
Since that member is not serialized it is null.
The iterator requires the firstEntry variable to walk the LinkedHashMap
https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/mutable/LinkedHashMap.scala#L94-L100

I wonder why these two variables were made transient.

Best,
Rahul Palamuttam


On Thu, Aug 25, 2016 at 11:13 PM, Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com> wrote:

> Hi Rahul,
>
> You have probably already figured this one out, but anyway...
> You need to register the classes that you'll be using with Kryo because it
> does not support all Serializable types and requires you to register the
> classes you’ll use in the program in advance. So when you don't register
> the class, Kryo doesn't know how to serialize/deserialize it.
>
>
> Best,
>
> Renato M.
>
> 2016-08-22 17:12 GMT+02:00 Rahul Palamuttam <rahulpala...@gmail.com>:
>
>> Hi,
>>
>> Just sending this again to see if others have had this issue.
>>
>> I recently switched to using kryo serialization and I've been running
>> into errors
>> with the mutable.LinkedHashMap class.
>>
>> If I don't register the mutable.LinkedHashMap class then I get an
>> ArrayStoreException seen below.
>> If I do register the class, then when the LinkedHashMap is collected on
>> the driver, it does not contain any elements.
>>
>> Here is the snippet of code I used :
>>
>> val sc = new SparkContext(new SparkConf()
>>   .setMaster("local[*]")
>>   .setAppName("Sample")
>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>   .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, 
>> String]])))
>>
>> val collect = sc.parallelize(0 to 10)
>>   .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", 
>> "bonjour"), ("good", "bueno")))
>>
>> val mapSideSizes = collect.map(p => p.size).collect()(0)
>> val driverSideSizes = collect.collect()(0).size
>>
>> println("The sizes before collect : " + mapSideSizes)
>> println("The sizes after collect : " + driverSideSizes)
>>
>>
>> ** The following only occurs if I did not register the
>> mutable.LinkedHashMap class **
>> 16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task
>> result
>> java.lang.ArrayStoreException: scala.collection.mutable.HashMap
>> at com.esotericsoftware.kryo.serializers.DefaultArraySerializer
>> s$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>> at com.esotericsoftware.kryo.serializers.DefaultArraySerializer
>> s$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>> at org.apache.spark.serializer.KryoSerializerInstance.deseriali
>> ze(KryoSerializer.scala:311)
>> at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>> $run$1.apply$mcV$sp(TaskResultGetter.scala:60)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>> $run$1.apply(TaskResultGetter.scala:51)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>> $run$1.apply(TaskResultGetter.scala:51)
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(Task
>> ResultGetter.scala:50)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I hope this is a known issue and/or I'm missing something important in my
>> setup.
>> Appreciate any help or advice!
>>
>> Best,
>>
>> Rahul Palamuttam
>>
>
>


Re: mutable.LinkedHashMap kryo serialization issues

2016-08-26 Thread Renato Marroquín Mogrovejo
Hi Rahul,

You have probably already figured this one out, but anyway...
You need to register the classes that you'll be using with Kryo because it
does not support all Serializable types and requires you to register the
classes you’ll use in the program in advance. So when you don't register
the class, Kryo doesn't know how to serialize/deserialize it.


Best,

Renato M.

2016-08-22 17:12 GMT+02:00 Rahul Palamuttam <rahulpala...@gmail.com>:

> Hi,
>
> Just sending this again to see if others have had this issue.
>
> I recently switched to using kryo serialization and I've been running into
> errors
> with the mutable.LinkedHashMap class.
>
> If I don't register the mutable.LinkedHashMap class then I get an
> ArrayStoreException seen below.
> If I do register the class, then when the LinkedHashMap is collected on
> the driver, it does not contain any elements.
>
> Here is the snippet of code I used :
>
> val sc = new SparkContext(new SparkConf()
>   .setMaster("local[*]")
>   .setAppName("Sample")
>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>   .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, String]])))
>
> val collect = sc.parallelize(0 to 10)
>   .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", 
> "bonjour"), ("good", "bueno")))
>
> val mapSideSizes = collect.map(p => p.size).collect()(0)
> val driverSideSizes = collect.collect()(0).size
>
> println("The sizes before collect : " + mapSideSizes)
> println("The sizes after collect : " + driverSideSizes)
>
>
> ** The following only occurs if I did not register the
> mutable.LinkedHashMap class **
> 16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task
> result
> java.lang.ArrayStoreException: scala.collection.mutable.HashMap
> at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.read(DefaultArraySerializers.java:338)
> at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.read(DefaultArraySerializers.java:293)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> at org.apache.spark.serializer.KryoSerializerInstance.
> deserialize(KryoSerializer.scala:311)
> at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$
> anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$
> anonfun$run$1.apply(TaskResultGetter.scala:51)
> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$
> anonfun$run$1.apply(TaskResultGetter.scala:51)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
> at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(
> TaskResultGetter.scala:50)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I hope this is a known issue and/or I'm missing something important in my
> setup.
> Appreciate any help or advice!
>
> Best,
>
> Rahul Palamuttam
>


Re:Do we still need to use Kryo serializer in Spark 1.6.2 ?

2016-08-23 Thread prosp4300
The way to use Kryo serializer is similar as Scala, like below, the only 
different is lack of convenient method "conf.registerKryoClasses", but it 
should be easy to make one by yourself

conf=SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.classesToRegister", 
"com.example.YourClassA,com.example.YourClassB")










At 2016-08-23 02:00:41, "Eric Ho" <e...@analyticsmd.com> wrote:

I heard that Kryo will get phased out at some point but not sure which Spark 
release.
I'm using PySpark, does anyone has any docs on how to call / use Kryo 
Serializer in PySpark ?


Thanks.



--



-eric ho



Re: Do we still need to use Kryo serializer in Spark 1.6.2 ?

2016-08-22 Thread Jacek Laskowski
Hi,

I've not heard this. And moreover I see Kryo supported in Encoders
(SerDes) in Spark 2.0.

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala#L151

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Mon, Aug 22, 2016 at 8:00 PM, Eric Ho <e...@analyticsmd.com> wrote:
> I heard that Kryo will get phased out at some point but not sure which Spark
> release.
> I'm using PySpark, does anyone has any docs on how to call / use Kryo
> Serializer in PySpark ?
>
> Thanks.
>
> --
>
> -eric ho
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Do we still need to use Kryo serializer in Spark 1.6.2 ?

2016-08-22 Thread Eric Ho
I heard that Kryo will get phased out at some point but not sure which
Spark release.
I'm using PySpark, does anyone has any docs on how to call / use Kryo
Serializer in PySpark ?

Thanks.

-- 

-eric ho


mutable.LinkedHashMap kryo serialization issues

2016-08-22 Thread Rahul Palamuttam
Hi,

Just sending this again to see if others have had this issue.

I recently switched to using kryo serialization and I've been running into 
errors
with the mutable.LinkedHashMap class.

If I don't register the mutable.LinkedHashMap class then I get an 
ArrayStoreException seen below.
If I do register the class, then when the LinkedHashMap is collected on the 
driver, it does not contain any elements.

Here is the snippet of code I used : 
val sc = new SparkContext(new SparkConf()
  .setMaster("local[*]")
  .setAppName("Sample")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, String]])))

val collect = sc.parallelize(0 to 10)
  .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", 
"bonjour"), ("good", "bueno")))

val mapSideSizes = collect.map(p => p.size).collect()(0)
val driverSideSizes = collect.collect()(0).size

println("The sizes before collect : " + mapSideSizes)
println("The sizes after collect : " + driverSideSizes)

** The following only occurs if I did not register the mutable.LinkedHashMap 
class **
16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task result
java.lang.ArrayStoreException: scala.collection.mutable.HashMap
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
at 
org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I hope this is a known issue and/or I'm missing something important in my setup.
Appreciate any help or advice!

Best,

Rahul Palamuttam

mutable.LinkedHashMap kryo serialization issues

2016-08-20 Thread Rahul Palamuttam
Hi,

I recently switched to using kryo serialization and I've been running into
errors
with the mutable.LinkedHashMap class.

If I don't register the mutable.LinkedHashMap class then I get an
ArrayStoreException seen below.
If I do register the class, then when the LinkedHashMap is collected on the
driver, it does not contain any elements.

Here is the snippet of code I used :

val sc = new SparkContext(new SparkConf()
  .setMaster("local[*]")
  .setAppName("Sample")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, String]])))

val collect = sc.parallelize(0 to 10)
  .map(p => new mutable.LinkedHashMap[String, String]() ++=
Array(("hello", "bonjour"), ("good", "bueno")))

val mapSideSizes = collect.map(p => p.size).collect()(0)
val driverSideSizes = collect.collect()(0).size

println("The sizes before collect : " + mapSideSizes)
println("The sizes after collect : " + driverSideSizes)


** The following only occurs if I did not register the
mutable.LinkedHashMap class **
16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task
result
java.lang.ArrayStoreException: scala.collection.mutable.HashMap
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I hope this is a known issue and/or I'm missing something important in my
setup.
Appreciate any help or advice!

Best,

Rahul Palamuttam


Spark, Kryo Serialization Issue with ProtoBuf field

2016-07-13 Thread Nkechi Achara
Hi,

I am seeing an error when running my spark job relating to Serialization of
a protobuf field when transforming an RDD.

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException Serialization trace: otherAuthors_
(com.thomsonreuters.kraken.medusa.dbor.proto.Book$DBBooks)

The error seems to be created at this point:

val booksPerTier: Iterable[(TimeTier, RDD[DBBooks])] = allTiers.map {

  tier => (tier, books.filter(b => isInTier(endOfInterval, tier, b)
&& !isBookPublished(o)).mapPartitions( it =>

  it.map{ord =>

(ord.getAuthor, ord.getPublisherName,
getGenre(ord.getSourceCountry))}))

}


val averagesPerAuthor = booksPerTier.flatMap { case (tier, opt) =>

  opt.map(o => (tier, o._1, PublisherCompanyComparison,
o._3)).countByValue()

}


val averagesPerPublisher = booksPerTier.flatMap { case (tier, opt) =>

  opt.map(o => (tier, o._1, PublisherComparison(o._2),
o._3)).countByValue()

}

The field is a list specified in the protobuf as the below:

otherAuthors_ = java.util.Collections.emptyList()

As you can see the code is not actually utilising that field from the Book
Protobuf, although it still is being transmitted over the network.

Has anyone got any advice on this?

Thanks,

K


Re: Kryo ClassCastException during Serialization/deserialization in Spark Streaming

2016-06-23 Thread swetha kasireddy
sampleMap is populated from inside a method that is getting called from
updateStateByKey

On Thu, Jun 23, 2016 at 1:13 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you illustrate how sampleMap is populated ?
>
> Thanks
>
> On Thu, Jun 23, 2016 at 12:34 PM, SRK <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> I keep getting the following error in my Spark Streaming every now and
>> then
>> after the  job runs for say around 10 hours. I have those 2 classes
>> registered in kryo as shown below.  sampleMap is a field in SampleSession
>> as shown below. Any suggestion as to how to avoid this would be of great
>> help!!
>>
>> public class SampleSession implements Serializable, Cloneable{
>>private Map<String, Sample> sampleMap;
>> }
>>
>>  sparkConf.registerKryoClasses(Array( classOf[SampleSession],
>> classOf[Sample]))
>>
>>
>>
>> com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException:
>> com.test.Sample cannot be cast to java.lang.String
>> Serialization trace:
>> sampleMap (com.test.SampleSession)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>> com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:96)
>> at
>> com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:93)
>> at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
>> at
>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
>> at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>>
>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
>> at
>>
>> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
>> at
>>
>> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
>> at
>>
>> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
>> at
>> org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
>> at
>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
>> at
>> org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
>> at
>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.ClassCastException: com.test.Sample cannot be cast to
>> java.lang.String
>> at

Re: Kryo ClassCastException during Serialization/deserialization in Spark Streaming

2016-06-23 Thread Ted Yu
Can you illustrate how sampleMap is populated ?

Thanks

On Thu, Jun 23, 2016 at 12:34 PM, SRK <swethakasire...@gmail.com> wrote:

> Hi,
>
> I keep getting the following error in my Spark Streaming every now and then
> after the  job runs for say around 10 hours. I have those 2 classes
> registered in kryo as shown below.  sampleMap is a field in SampleSession
> as shown below. Any suggestion as to how to avoid this would be of great
> help!!
>
> public class SampleSession implements Serializable, Cloneable{
>private Map<String, Sample> sampleMap;
> }
>
>  sparkConf.registerKryoClasses(Array( classOf[SampleSession],
> classOf[Sample]))
>
>
>
> com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException:
> com.test.Sample cannot be cast to java.lang.String
> Serialization trace:
> sampleMap (com.test.SampleSession)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
> at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:96)
> at
> com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:93)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
> at
> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
>
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
> at
>
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
> at
>
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
> at
>
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
> at
> org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
> at
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
> at
> org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
> at
> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ClassCastException: com.test.Sample cannot be cast to
> java.lang.String
> at
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:82)
> at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
>
> com.esotericsoftware.kryo.serializers.

Kryo ClassCastException during Serialization/deserialization in Spark Streaming

2016-06-23 Thread SRK
Hi,

I keep getting the following error in my Spark Streaming every now and then
after the  job runs for say around 10 hours. I have those 2 classes
registered in kryo as shown below.  sampleMap is a field in SampleSession 
as shown below. Any suggestion as to how to avoid this would be of great
help!!

public class SampleSession implements Serializable, Cloneable{
   private Map<String, Sample> sampleMap;
}

 sparkConf.registerKryoClasses(Array( classOf[SampleSession],
classOf[Sample]))



com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException:
com.test.Sample cannot be cast to java.lang.String
Serialization trace:
sampleMap (com.test.SampleSession)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:96)
at com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:93)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
at
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
at
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
at
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
at 
org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
at 
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ClassCastException: com.test.Sample cannot be cast to
java.lang.String
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:82)
at
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
... 37 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-ClassCastException-during-Serialization-deserialization-in-Spark-Streaming-tp27219.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-23 Thread Amit Sela
See SPARK-15489 <https://issues.apache.org/jira/browse/SPARK-15489>
I'll try to figure this one out as well, any leads ? "immediate suspects" ?

Thanks,
Amit

On Mon, May 23, 2016 at 10:27 PM Michael Armbrust <mich...@databricks.com>
wrote:

> Can you open a JIRA?
>
> On Sun, May 22, 2016 at 2:50 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
>> I've been using Encoders with Kryo to support encoding of generically
>> typed Java classes, mostly with success, in the following manner:
>>
>> public static  Encoder encoder() {
>>   return Encoders.kryo((Class) Object.class);
>> }
>>
>> But at some point I got a decoding exception "Caused by:
>> java.lang.UnsupportedOperationException
>> at java.util.Collections$UnmodifiableCollection.add..."
>>
>> This seems to be because of Guava's `ImmutableList`.
>>
>> I tried registering `UnmodifiableCollectionsSerializer` and `
>> ImmutableListSerializer` from: https://github.com/magro/kryo-serializers
>> but it didn't help.
>>
>> Ideas ?
>>
>> Thanks,
>> Amit
>>
>
>


Re: Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-23 Thread Michael Armbrust
Can you open a JIRA?

On Sun, May 22, 2016 at 2:50 PM, Amit Sela <amitsel...@gmail.com> wrote:

> I've been using Encoders with Kryo to support encoding of generically
> typed Java classes, mostly with success, in the following manner:
>
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
>
> But at some point I got a decoding exception "Caused by:
> java.lang.UnsupportedOperationException
> at java.util.Collections$UnmodifiableCollection.add..."
>
> This seems to be because of Guava's `ImmutableList`.
>
> I tried registering `UnmodifiableCollectionsSerializer` and `
> ImmutableListSerializer` from: https://github.com/magro/kryo-serializers
> but it didn't help.
>
> Ideas ?
>
> Thanks,
> Amit
>


Re: kryo

2016-05-12 Thread Ted Yu
This should be related:

https://github.com/JodaOrg/joda-time/issues/307

Do you have more of the stack trace ?

Cheers

On Thu, May 12, 2016 at 12:39 PM, Younes Naguib <
younes.nag...@tritondigital.com> wrote:

> Thanks,
>
> I used that.
>
> Now I seem to have the following problem:
>
> java.lang.NullPointerException
>
> at
> org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143)
>
> at
> org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103)
>
> at
> org.joda.time.DateTimeZone.convertUTCToLocal(DateTimeZone.java:925)
>
>
>
>
>
> Any ideas?
>
>
>
> Thanks
>
>
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* May-11-16 5:32 PM
> *To:* Younes Naguib
> *Cc:* user@spark.apache.org
> *Subject:* Re: kryo
>
>
>
> Have you seen this thread ?
>
>
>
>
> http://search-hadoop.com/m/q3RTtpO0qI3cp06/JodaDateTimeSerializer+spark=Re+NPE+when+using+Joda+DateTime
>
>
>
> On Wed, May 11, 2016 at 2:18 PM, Younes Naguib <
> younes.nag...@tritondigital.com> wrote:
>
> Hi all,
>
> I'm trying to get to use spark.serializer.
> I set it in the spark-default.conf, but I statred getting issues with
> datetimes.
>
> As I understand, I need to disable it.
> Anyways to keep using kryo?
>
> It's seems I can use JodaDateTimeSerializer for datetimes, just not sure
> how to set it, and register it in the spark-default conf.
>
> Thanks,
>
> *Younes Naguib* <younes.nag...@streamtheworld.com>
>
>
>


RE: kryo

2016-05-12 Thread Younes Naguib
Thanks,
I used that.
Now I seem to have the following problem:
java.lang.NullPointerException
at 
org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143)
at 
org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103)
at org.joda.time.DateTimeZone.convertUTCToLocal(DateTimeZone.java:925)


Any ideas?

Thanks


From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: May-11-16 5:32 PM
To: Younes Naguib
Cc: user@spark.apache.org
Subject: Re: kryo

Have you seen this thread ?

http://search-hadoop.com/m/q3RTtpO0qI3cp06/JodaDateTimeSerializer+spark=Re+NPE+when+using+Joda+DateTime

On Wed, May 11, 2016 at 2:18 PM, Younes Naguib 
<younes.nag...@tritondigital.com<mailto:younes.nag...@tritondigital.com>> wrote:
Hi all,

I'm trying to get to use spark.serializer.
I set it in the spark-default.conf, but I statred getting issues with datetimes.

As I understand, I need to disable it.
Anyways to keep using kryo?

It's seems I can use JodaDateTimeSerializer for datetimes, just not sure how to 
set it, and register it in the spark-default conf.

Thanks,
Younes Naguib <mailto:younes.nag...@streamtheworld.com>



Re: kryo

2016-05-11 Thread Ted Yu
Have you seen this thread ?

http://search-hadoop.com/m/q3RTtpO0qI3cp06/JodaDateTimeSerializer+spark=Re+NPE+when+using+Joda+DateTime

On Wed, May 11, 2016 at 2:18 PM, Younes Naguib <
younes.nag...@tritondigital.com> wrote:

> Hi all,
>
> I'm trying to get to use spark.serializer.
> I set it in the spark-default.conf, but I statred getting issues with
> datetimes.
>
> As I understand, I need to disable it.
> Anyways to keep using kryo?
>
> It's seems I can use JodaDateTimeSerializer for datetimes, just not sure
> how to set it, and register it in the spark-default conf.
>
> Thanks,
>
> *Younes Naguib* <younes.nag...@streamtheworld.com>
>


kryo

2016-05-11 Thread Younes Naguib
Hi all,

I'm trying to get to use spark.serializer.
I set it in the spark-default.conf, but I statred getting issues with datetimes.

As I understand, I need to disable it.
Anyways to keep using kryo?

It's seems I can use JodaDateTimeSerializer for datetimes, just not sure how to 
set it, and register it in the spark-default conf.

Thanks,
Younes Naguib <mailto:younes.nag...@streamtheworld.com>


Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Nirav Patel
Yes my mistake. I am using Spark 1.5.2 not 2.x.

I looked at running spark driver jvm process on linux. Looks like my
settings are not being applied to driver. We use oozie spark action to
launch spark. I will have to investigate more on that.

hopefully spark is or have replaced memory killer Java serializer to better
streaming serializer.

Thanks

On Sun, May 8, 2016 at 9:33 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> See the following:
> [SPARK-7997][CORE] Remove Akka from Spark Core and Streaming
>
> I guess you meant you are using Spark 1.5.1
>
> For the time being, consider increasing spark.driver.memory
>
> Cheers
>
> On Sun, May 8, 2016 at 9:14 AM, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> Yes, I am using yarn client mode hence I specified am settings too.
>> What you mean akka is moved out of picture? I am using spark 2.5.1
>>
>> Sent from my iPhone
>>
>> On May 8, 2016, at 6:39 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>> Are you using YARN client mode ?
>>
>> See
>> https://spark.apache.org/docs/latest/running-on-yarn.html
>>
>> In cluster mode, spark.yarn.am.memory is not effective.
>>
>> For Spark 2.0, akka is moved out of the picture.
>> FYI
>>
>> On Sat, May 7, 2016 at 8:24 PM, Nirav Patel <npa...@xactlycorp.com>
>> wrote:
>>
>>> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one.
>>> All of them have 6474 tasks. 5th task is a count operations and it also
>>> performs aggregateByKey as a part of it lazy evaluation.
>>> I am setting:
>>> spark.driver.memory=10G, spark.yarn.am.memory=2G and
>>> spark.driver.maxResultSize=9G
>>>
>>>
>>> On a side note, could it be something to do with java serialization
>>> library, ByteArrayOutputStream using byte array? Can it be replaced by
>>> some better serializing library?
>>>
>>> https://bugs.openjdk.java.net/browse/JDK-8055949
>>> https://bugs.openjdk.java.net/browse/JDK-8136527
>>>
>>> Thanks
>>>
>>> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com>
>>> wrote:
>>>
>>>> Driver maintains the complete metadata of application ( scheduling of
>>>> executor and maintaining the messaging to control the execution )
>>>> This code seems to be failing in that code path only. With that said
>>>> there is Jvm overhead based on num of executors , stages and tasks in your
>>>> app. Do you know your driver heap size and application structure ( num of
>>>> stages and tasks )
>>>>
>>>> Ashish
>>>>
>>>> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote:
>>>>
>>>>> Right but this logs from spark driver and spark driver seems to use
>>>>> Akka.
>>>>>
>>>>> ERROR [sparkDriver-akka.actor.default-dispatcher-17]
>>>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread
>>>>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
>>>>> ActorSystem [sparkDriver]
>>>>>
>>>>> I saw following logs before above happened.
>>>>>
>>>>> 2016-05-06 09:49:17,813 INFO
>>>>> [sparkDriver-akka.actor.default-dispatcher-17]
>>>>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
>>>>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>>>>>
>>>>>
>>>>> As far as I know driver is just driving shuffle operation but not
>>>>> actually doing anything within its own system that will cause memory 
>>>>> issue.
>>>>> Can you explain in what circumstances I could see this error in driver
>>>>> logs? I don't do any collect or any other driver operation that would 
>>>>> cause
>>>>> this. It fails when doing aggregateByKey operation but that should happen
>>>>> in executor JVM NOT in driver JVM.
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> bq.   at akka.serialization.JavaSerializer.toBinary(
>>>>>> Serializer.scala:129)
>>>>>>
>>>>>> It was Akka which uses JavaSerializer
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Sat, May 7

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Ted Yu
See the following:
[SPARK-7997][CORE] Remove Akka from Spark Core and Streaming

I guess you meant you are using Spark 1.5.1

For the time being, consider increasing spark.driver.memory

Cheers

On Sun, May 8, 2016 at 9:14 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Yes, I am using yarn client mode hence I specified am settings too.
> What you mean akka is moved out of picture? I am using spark 2.5.1
>
> Sent from my iPhone
>
> On May 8, 2016, at 6:39 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
> Are you using YARN client mode ?
>
> See
> https://spark.apache.org/docs/latest/running-on-yarn.html
>
> In cluster mode, spark.yarn.am.memory is not effective.
>
> For Spark 2.0, akka is moved out of the picture.
> FYI
>
> On Sat, May 7, 2016 at 8:24 PM, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one.
>> All of them have 6474 tasks. 5th task is a count operations and it also
>> performs aggregateByKey as a part of it lazy evaluation.
>> I am setting:
>> spark.driver.memory=10G, spark.yarn.am.memory=2G and
>> spark.driver.maxResultSize=9G
>>
>>
>> On a side note, could it be something to do with java serialization
>> library, ByteArrayOutputStream using byte array? Can it be replaced by
>> some better serializing library?
>>
>> https://bugs.openjdk.java.net/browse/JDK-8055949
>> https://bugs.openjdk.java.net/browse/JDK-8136527
>>
>> Thanks
>>
>> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com>
>> wrote:
>>
>>> Driver maintains the complete metadata of application ( scheduling of
>>> executor and maintaining the messaging to control the execution )
>>> This code seems to be failing in that code path only. With that said
>>> there is Jvm overhead based on num of executors , stages and tasks in your
>>> app. Do you know your driver heap size and application structure ( num of
>>> stages and tasks )
>>>
>>> Ashish
>>>
>>> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote:
>>>
>>>> Right but this logs from spark driver and spark driver seems to use
>>>> Akka.
>>>>
>>>> ERROR [sparkDriver-akka.actor.default-dispatcher-17]
>>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread
>>>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
>>>> ActorSystem [sparkDriver]
>>>>
>>>> I saw following logs before above happened.
>>>>
>>>> 2016-05-06 09:49:17,813 INFO
>>>> [sparkDriver-akka.actor.default-dispatcher-17]
>>>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
>>>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>>>>
>>>>
>>>> As far as I know driver is just driving shuffle operation but not
>>>> actually doing anything within its own system that will cause memory issue.
>>>> Can you explain in what circumstances I could see this error in driver
>>>> logs? I don't do any collect or any other driver operation that would cause
>>>> this. It fails when doing aggregateByKey operation but that should happen
>>>> in executor JVM NOT in driver JVM.
>>>>
>>>>
>>>> Thanks
>>>>
>>>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> bq.   at akka.serialization.JavaSerializer.toBinary(
>>>>> Serializer.scala:129)
>>>>>
>>>>> It was Akka which uses JavaSerializer
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I thought I was using kryo serializer for shuffle.  I could verify it
>>>>>> from spark UI - Environment tab that
>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>>>>> spark.kryo.registrator
>>>>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>>>>>
>>>>>>
>>>>>> But when I see following error in Driver logs it looks like spark is
>>>>>> using JavaSerializer
>>>>>>
>>>>>> 2016-05-06 09:49:26,490 ERROR
>>>>>> [sparkDriver-akka.actor.default-dispatcher-17] 
>

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Nirav Patel
Yes, I am using yarn client mode hence I specified am settings too.
What you mean akka is moved out of picture? I am using spark 2.5.1 

Sent from my iPhone

> On May 8, 2016, at 6:39 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Are you using YARN client mode ?
> 
> See
> https://spark.apache.org/docs/latest/running-on-yarn.html
> 
> In cluster mode, spark.yarn.am.memory is not effective.
> 
> For Spark 2.0, akka is moved out of the picture.
> FYI
> 
>> On Sat, May 7, 2016 at 8:24 PM, Nirav Patel <npa...@xactlycorp.com> wrote:
>> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one. All 
>> of them have 6474 tasks. 5th task is a count operations and it also performs 
>> aggregateByKey as a part of it lazy evaluation. 
>> I am setting:
>> spark.driver.memory=10G, spark.yarn.am.memory=2G and 
>> spark.driver.maxResultSize=9G 
>> 
>> 
>> On a side note, could it be something to do with java serialization library, 
>> ByteArrayOutputStream using byte array? Can it be replaced by some better 
>> serializing library?
>> 
>> https://bugs.openjdk.java.net/browse/JDK-8055949
>> https://bugs.openjdk.java.net/browse/JDK-8136527
>> 
>> Thanks
>> 
>>> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com> wrote:
>>> Driver maintains the complete metadata of application ( scheduling of 
>>> executor and maintaining the messaging to control the execution )
>>> This code seems to be failing in that code path only. With that said there 
>>> is Jvm overhead based on num of executors , stages and tasks in your app. 
>>> Do you know your driver heap size and application structure ( num of stages 
>>> and tasks )
>>> 
>>> Ashish 
>>> 
>>>> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote:
>>>> Right but this logs from spark driver and spark driver seems to use Akka.
>>>> 
>>>> ERROR [sparkDriver-akka.actor.default-dispatcher-17] 
>>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread 
>>>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down 
>>>> ActorSystem [sparkDriver]
>>>> 
>>>> I saw following logs before above happened.
>>>> 
>>>> 2016-05-06 09:49:17,813 INFO 
>>>> [sparkDriver-akka.actor.default-dispatcher-17] 
>>>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output 
>>>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>>>> 
>>>> 
>>>> 
>>>> As far as I know driver is just driving shuffle operation but not actually 
>>>> doing anything within its own system that will cause memory issue. Can you 
>>>> explain in what circumstances I could see this error in driver logs? I 
>>>> don't do any collect or any other driver operation that would cause this. 
>>>> It fails when doing aggregateByKey operation but that should happen in 
>>>> executor JVM NOT in driver JVM.
>>>> 
>>>> 
>>>> 
>>>> Thanks
>>>> 
>>>> 
>>>>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>> bq.   at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>>>> 
>>>>> It was Akka which uses JavaSerializer
>>>>> 
>>>>> Cheers
>>>>> 
>>>>>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com> 
>>>>>> wrote:
>>>>>> Hi,
>>>>>> 
>>>>>> I thought I was using kryo serializer for shuffle.  I could verify it 
>>>>>> from spark UI - Environment tab that 
>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>>>>> spark.kryo.registrator   
>>>>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>>>>> 
>>>>>> 
>>>>>> But when I see following error in Driver logs it looks like spark is 
>>>>>> using JavaSerializer 
>>>>>> 
>>>>>> 2016-05-06 09:49:26,490 ERROR 
>>>>>> [sparkDriver-akka.actor.default-dispatcher-17] 
>>>>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread 
>>>>>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down 
>>>>>> ActorSystem [sparkDriver]
>>>>>> 
>>>>>

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Ted Yu
Are you using YARN client mode ?

See
https://spark.apache.org/docs/latest/running-on-yarn.html

In cluster mode, spark.yarn.am.memory is not effective.

For Spark 2.0, akka is moved out of the picture.
FYI

On Sat, May 7, 2016 at 8:24 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one.
> All of them have 6474 tasks. 5th task is a count operations and it also
> performs aggregateByKey as a part of it lazy evaluation.
> I am setting:
> spark.driver.memory=10G, spark.yarn.am.memory=2G and
> spark.driver.maxResultSize=9G
>
>
> On a side note, could it be something to do with java serialization
> library, ByteArrayOutputStream using byte array? Can it be replaced by
> some better serializing library?
>
> https://bugs.openjdk.java.net/browse/JDK-8055949
> https://bugs.openjdk.java.net/browse/JDK-8136527
>
> Thanks
>
> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com> wrote:
>
>> Driver maintains the complete metadata of application ( scheduling of
>> executor and maintaining the messaging to control the execution )
>> This code seems to be failing in that code path only. With that said
>> there is Jvm overhead based on num of executors , stages and tasks in your
>> app. Do you know your driver heap size and application structure ( num of
>> stages and tasks )
>>
>> Ashish
>>
>> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote:
>>
>>> Right but this logs from spark driver and spark driver seems to use Akka.
>>>
>>> ERROR [sparkDriver-akka.actor.default-dispatcher-17]
>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread
>>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
>>> ActorSystem [sparkDriver]
>>>
>>> I saw following logs before above happened.
>>>
>>> 2016-05-06 09:49:17,813 INFO
>>> [sparkDriver-akka.actor.default-dispatcher-17]
>>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
>>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>>>
>>>
>>> As far as I know driver is just driving shuffle operation but not
>>> actually doing anything within its own system that will cause memory issue.
>>> Can you explain in what circumstances I could see this error in driver
>>> logs? I don't do any collect or any other driver operation that would cause
>>> this. It fails when doing aggregateByKey operation but that should happen
>>> in executor JVM NOT in driver JVM.
>>>
>>>
>>> Thanks
>>>
>>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> bq.   at akka.serialization.JavaSerializer.toBinary(
>>>> Serializer.scala:129)
>>>>
>>>> It was Akka which uses JavaSerializer
>>>>
>>>> Cheers
>>>>
>>>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I thought I was using kryo serializer for shuffle.  I could verify it
>>>>> from spark UI - Environment tab that
>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>>>> spark.kryo.registrator
>>>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>>>>
>>>>>
>>>>> But when I see following error in Driver logs it looks like spark is
>>>>> using JavaSerializer
>>>>>
>>>>> 2016-05-06 09:49:26,490 ERROR
>>>>> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl:
>>>>> Uncaught fatal error from thread
>>>>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
>>>>> ActorSystem [sparkDriver]
>>>>>
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>
>>>>> at java.util.Arrays.copyOf(Arrays.java:2271)
>>>>>
>>>>> at
>>>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>>>>
>>>>> at
>>>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>>>>
>>>>> at
>>>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>>>>
>>>>> at
>>>>> java.io.ObjectOut

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Nirav Patel
I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one. All
of them have 6474 tasks. 5th task is a count operations and it also
performs aggregateByKey as a part of it lazy evaluation.
I am setting:
spark.driver.memory=10G, spark.yarn.am.memory=2G and
spark.driver.maxResultSize=9G


On a side note, could it be something to do with java serialization
library, ByteArrayOutputStream using byte array? Can it be replaced by some
better serializing library?

https://bugs.openjdk.java.net/browse/JDK-8055949
https://bugs.openjdk.java.net/browse/JDK-8136527

Thanks

On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com> wrote:

> Driver maintains the complete metadata of application ( scheduling of
> executor and maintaining the messaging to control the execution )
> This code seems to be failing in that code path only. With that said there
> is Jvm overhead based on num of executors , stages and tasks in your app.
> Do you know your driver heap size and application structure ( num of stages
> and tasks )
>
> Ashish
>
> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> Right but this logs from spark driver and spark driver seems to use Akka.
>>
>> ERROR [sparkDriver-akka.actor.default-dispatcher-17]
>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread
>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
>> ActorSystem [sparkDriver]
>>
>> I saw following logs before above happened.
>>
>> 2016-05-06 09:49:17,813 INFO
>> [sparkDriver-akka.actor.default-dispatcher-17]
>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>>
>>
>> As far as I know driver is just driving shuffle operation but not
>> actually doing anything within its own system that will cause memory issue.
>> Can you explain in what circumstances I could see this error in driver
>> logs? I don't do any collect or any other driver operation that would cause
>> this. It fails when doing aggregateByKey operation but that should happen
>> in executor JVM NOT in driver JVM.
>>
>>
>> Thanks
>>
>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> bq.   at akka.serialization.JavaSerializer.toBinary(
>>> Serializer.scala:129)
>>>
>>> It was Akka which uses JavaSerializer
>>>
>>> Cheers
>>>
>>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I thought I was using kryo serializer for shuffle.  I could verify it
>>>> from spark UI - Environment tab that
>>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>>> spark.kryo.registrator
>>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>>>
>>>>
>>>> But when I see following error in Driver logs it looks like spark is
>>>> using JavaSerializer
>>>>
>>>> 2016-05-06 09:49:26,490 ERROR
>>>> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl:
>>>> Uncaught fatal error from thread
>>>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
>>>> ActorSystem [sparkDriver]
>>>>
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>
>>>> at java.util.Arrays.copyOf(Arrays.java:2271)
>>>>
>>>> at
>>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>>>
>>>> at
>>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>>>
>>>> at
>>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>>>
>>>> at
>>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>>>
>>>> at
>>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>>>
>>>> at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>>>
>>>> at
>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>
>>>> at
>>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>>>>
>>>> at
>>>> akka.serialization.JavaSeriali

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Ashish Dubey
Driver maintains the complete metadata of application ( scheduling of
executor and maintaining the messaging to control the execution )
This code seems to be failing in that code path only. With that said there
is Jvm overhead based on num of executors , stages and tasks in your app.
Do you know your driver heap size and application structure ( num of stages
and tasks )

Ashish
On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote:

> Right but this logs from spark driver and spark driver seems to use Akka.
>
> ERROR [sparkDriver-akka.actor.default-dispatcher-17]
> akka.actor.ActorSystemImpl: Uncaught fatal error from thread
> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
> ActorSystem [sparkDriver]
>
> I saw following logs before above happened.
>
> 2016-05-06 09:49:17,813 INFO
> [sparkDriver-akka.actor.default-dispatcher-17]
> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>
>
> As far as I know driver is just driving shuffle operation but not actually
> doing anything within its own system that will cause memory issue. Can you
> explain in what circumstances I could see this error in driver logs? I
> don't do any collect or any other driver operation that would cause this.
> It fails when doing aggregateByKey operation but that should happen in
> executor JVM NOT in driver JVM.
>
>
> Thanks
>
> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com
> <javascript:_e(%7B%7D,'cvml','yuzhih...@gmail.com');>> wrote:
>
>> bq.   at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>
>> It was Akka which uses JavaSerializer
>>
>> Cheers
>>
>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com
>> <javascript:_e(%7B%7D,'cvml','npa...@xactlycorp.com');>> wrote:
>>
>>> Hi,
>>>
>>> I thought I was using kryo serializer for shuffle.  I could verify it
>>> from spark UI - Environment tab that
>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>> spark.kryo.registrator
>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>>
>>>
>>> But when I see following error in Driver logs it looks like spark is
>>> using JavaSerializer
>>>
>>> 2016-05-06 09:49:26,490 ERROR
>>> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl:
>>> Uncaught fatal error from thread
>>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
>>> ActorSystem [sparkDriver]
>>>
>>> java.lang.OutOfMemoryError: Java heap space
>>>
>>> at java.util.Arrays.copyOf(Arrays.java:2271)
>>>
>>> at
>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>>
>>> at
>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>>
>>> at
>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>>
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>>
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>>
>>> at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>>
>>> at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>
>>> at
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>>>
>>> at
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>>
>>> at
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>>
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>
>>> at
>>> akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>>
>>> at
>>> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>>>
>>> at
>>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>>>
>>> at
>>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>>>
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>
>>> at

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Nirav Patel
Right but this logs from spark driver and spark driver seems to use Akka.

ERROR [sparkDriver-akka.actor.default-dispatcher-17]
akka.actor.ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
ActorSystem [sparkDriver]

I saw following logs before above happened.

2016-05-06 09:49:17,813 INFO [sparkDriver-akka.actor.default-dispatcher-17]
org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 1 to hdn6.xactlycorporation.local:44503


As far as I know driver is just driving shuffle operation but not actually
doing anything within its own system that will cause memory issue. Can you
explain in what circumstances I could see this error in driver logs? I
don't do any collect or any other driver operation that would cause this.
It fails when doing aggregateByKey operation but that should happen in
executor JVM NOT in driver JVM.


Thanks

On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> bq.   at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>
> It was Akka which uses JavaSerializer
>
> Cheers
>
> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> Hi,
>>
>> I thought I was using kryo serializer for shuffle.  I could verify it
>> from spark UI - Environment tab that
>> spark.serializer org.apache.spark.serializer.KryoSerializer
>> spark.kryo.registrator
>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>
>>
>> But when I see following error in Driver logs it looks like spark is
>> using JavaSerializer
>>
>> 2016-05-06 09:49:26,490 ERROR
>> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl:
>> Uncaught fatal error from thread
>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: Java heap space
>>
>> at java.util.Arrays.copyOf(Arrays.java:2271)
>>
>> at
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>
>> at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>
>> at
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>>
>> at
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>
>> at
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>
>> at
>> akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>
>> at
>> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>>
>> at
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>>
>> at
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>>
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>
>> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
>>
>> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
>>
>> at
>> akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>
>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> 

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Ted Yu
bq.   at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)

It was Akka which uses JavaSerializer

Cheers

On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Hi,
>
> I thought I was using kryo serializer for shuffle.  I could verify it from
> spark UI - Environment tab that
> spark.serializer org.apache.spark.serializer.KryoSerializer
> spark.kryo.registrator
> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>
>
> But when I see following error in Driver logs it looks like spark is using
> JavaSerializer
>
> 2016-05-06 09:49:26,490 ERROR
> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl:
> Uncaught fatal error from thread
> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
> ActorSystem [sparkDriver]
>
> java.lang.OutOfMemoryError: Java heap space
>
> at java.util.Arrays.copyOf(Arrays.java:2271)
>
> at
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>
> at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>
> at
> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
>
> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
>
> at
> akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> What I am missing here?
>
> Thanks
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>


How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Nirav Patel
Hi,

I thought I was using kryo serializer for shuffle.  I could verify it from
spark UI - Environment tab that
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator com.myapp.spark.jobs.conf.SparkSerializerRegistrator


But when I see following error in Driver logs it looks like spark is using
JavaSerializer

2016-05-06 09:49:26,490 ERROR
[sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl:
Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: Java heap space

at java.util.Arrays.copyOf(Arrays.java:2271)

at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)

at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)

at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)

at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)

at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)

at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)

at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)

at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)

at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)

at
akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)

at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)

at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)

at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)

at
akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)

at akka.actor.Actor$class.aroundReceive(Actor.scala:467)

at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

at akka.actor.ActorCell.invoke(ActorCell.scala:487)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)

at akka.dispatch.Mailbox.run(Mailbox.scala:220)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)

at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



What I am missing here?

Thanks

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: Kryo serialization mismatch in spark sql windowing function

2016-04-06 Thread Soam Acharya
ar,/opt/hive/lib/hive-accumulo-handler-1.2.0.jar,/opt/hive/lib/ant-launcher-1.9.1.jar,/opt/hive/lib/hive-jdbc-1.2.0.jar,/opt/hive/lib/commons-compress-1.4.1.jar,/opt/hive/lib/commons-logging-1.1.3.jar,/opt/hive/lib/hive-serde-1.2.0.jar,/opt/hive/lib/zookeeper-3.4.6.jar,/opt/hive/lib/accumulo-start-1.6.0.jar,/opt/hive/lib/hive-contrib-1.2.0.jar,/opt/hive/lib/log4j-1.2.16.jar,/opt/hive/lib/commons-compiler-2.7.6.jar,/opt/hive/lib/ST4-4.0.4.jar,/opt/hive/lib/calcite-avatica-1.2.0-incubating.jar,/opt/hive/lib/httpclient-4.4.jar,/opt/hive/lib/commons-codec-1.4.jar,/opt/hive/lib/commons-io-2.4.jar,/opt/hive/lib/commons-digester-1.8.jar,/opt/hive/lib/regexp-1.3.jar,/opt/hive/lib/ivy-2.4.0.jar,/opt/hive/lib/eigenbase-properties-1.1.5.jar,/opt/hive/lib/paranamer-2.3.jar,/opt/hive/lib/mail-1.4.1.jar,/opt/hive/lib/asm-commons-3.1.jar,/opt/hive/lib/commons-lang-2.6.jar,/opt/hive/lib/hive-jdbc-1.2.0-standalone.jar,/opt/hive/lib/hive-shims-common-1.2.0.jar,/opt/hive/lib/hamcrest-core-1.1.jar,/opt/hive/lib/super-csv-2.2.0.jar,
  spark.history.ui.port -> 18080
  spark.fileserver.port -> 45090
  spark.history.retainedApplications -> 999
  spark.ui.port -> 45100
  spark.shuffle.consolidateFiles -> true
  spark.executor.extraJavaOptions -> -XX:+PrintReferenceGC -verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-Djava.library.path=/opt/hadoop/lib/native/
  spark.history.fs.logDirectory -> hdfs:///logs/spark-history
  spark.eventLog.dir -> hdfs:///logs/spark-history/alti_soam/
  spark.executor.extraClassPath ->
spark-hive_2.10-1.6.1.jar:spark-hive-thriftserver_2.10-1.6.1.jar
  spark.driver.port -> 45055
  spark.port.maxRetries -> 999
  spark.executor.port -> 45250
  spark.driver.extraClassPath -> ...



On Wed, Apr 6, 2016 at 6:59 PM, Josh Rosen <joshro...@databricks.com> wrote:

>
> Spark is compiled against a custom fork of Hive 1.2.1 which added shading
> of Protobuf and removed shading of Kryo. What I think that what's happening
> here is that stock Hive 1.2.1 is taking precedence so the Kryo instance
> that it's returning is an instance of shaded/relocated Hive version rather
> than the unshaded, stock Kryo that Spark is expecting here.
>
> I just so happen to have a patch which reintroduces the shading of Kryo
> (motivated by other factors): https://github.com/apache/spark/pull/12215;
> there's a chance that a backport of this patch might fix this problem.
>
> However, I'm a bit curious about how your classpath is set up and why
> stock 1.2.1's shaded Kryo is being used here.
>
> /cc +Marcelo Vanzin <van...@cloudera.com> and +Steve Loughran
> <ste...@hortonworks.com>, who may know more.
>
> On Wed, Apr 6, 2016 at 6:08 PM Soam Acharya <s...@altiscale.com> wrote:
>
>> Hi folks,
>>
>> I have a build of Spark 1.6.1 on which spark sql seems to be functional
>> outside of windowing functions. For example, I can create a simple external
>> table via Hive:
>>
>> CREATE EXTERNAL TABLE PSTable (pid int, tty string, time string, cmd
>> string)
>> ROW FORMAT DELIMITED
>> FIELDS TERMINATED BY ','
>> LINES TERMINATED BY '\n'
>> STORED AS TEXTFILE
>> LOCATION '/user/test/ps';
>>
>> Ensure that the table is pointing to some valid data, set up spark sql to
>> point to the Hive metastore (we're running Hive 1.2.1) and run a basic test:
>>
>> spark-sql> select * from PSTable;
>> 7239pts/0   00:24:31java
>> 9993pts/9   00:00:00ps
>> 9994pts/9   00:00:00tail
>> 9995pts/9   00:00:00sed
>> 9996pts/9   00:00:00sed
>>
>> But when I try to run a windowing function which I know runs onHive, I
>> get:
>>
>> spark-sql> select a.pid ,a.time, a.cmd, min(a.time) over (partition by
>> a.cmd order by a.time ) from PSTable a;
>> org.apache.spark.SparkException: Task not serializable
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
>> at
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>> at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
>> :
>> :
>> Caused by: java.lang.ClassCastException:
>> org.apache.hive.com.esotericsoftware.kryo.Kryo cannot be cast to
>> com.esotericsoftware.kryo.Kryo
>> at
>> org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.serializePlan(HiveShim.scala:178)
>> at
>> org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.writeExternal(HiveShim.scala:191)
>> at
>> java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>
>> Any thoughts or ideas would be appreciated!
>>
>> Regards,
>>
>> Soam
>>
>


Re: Kryo serialization mismatch in spark sql windowing function

2016-04-06 Thread Josh Rosen
Spark is compiled against a custom fork of Hive 1.2.1 which added shading
of Protobuf and removed shading of Kryo. What I think that what's happening
here is that stock Hive 1.2.1 is taking precedence so the Kryo instance
that it's returning is an instance of shaded/relocated Hive version rather
than the unshaded, stock Kryo that Spark is expecting here.

I just so happen to have a patch which reintroduces the shading of Kryo
(motivated by other factors): https://github.com/apache/spark/pull/12215;
there's a chance that a backport of this patch might fix this problem.

However, I'm a bit curious about how your classpath is set up and why stock
1.2.1's shaded Kryo is being used here.

/cc +Marcelo Vanzin <van...@cloudera.com> and +Steve Loughran
<ste...@hortonworks.com>, who may know more.

On Wed, Apr 6, 2016 at 6:08 PM Soam Acharya <s...@altiscale.com> wrote:

> Hi folks,
>
> I have a build of Spark 1.6.1 on which spark sql seems to be functional
> outside of windowing functions. For example, I can create a simple external
> table via Hive:
>
> CREATE EXTERNAL TABLE PSTable (pid int, tty string, time string, cmd
> string)
> ROW FORMAT DELIMITED
> FIELDS TERMINATED BY ','
> LINES TERMINATED BY '\n'
> STORED AS TEXTFILE
> LOCATION '/user/test/ps';
>
> Ensure that the table is pointing to some valid data, set up spark sql to
> point to the Hive metastore (we're running Hive 1.2.1) and run a basic test:
>
> spark-sql> select * from PSTable;
> 7239pts/0   00:24:31java
> 9993pts/9   00:00:00ps
> 9994pts/9   00:00:00tail
> 9995pts/9   00:00:00sed
> 9996pts/9   00:00:00sed
>
> But when I try to run a windowing function which I know runs onHive, I get:
>
> spark-sql> select a.pid ,a.time, a.cmd, min(a.time) over (partition by
> a.cmd order by a.time ) from PSTable a;
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
> :
> :
> Caused by: java.lang.ClassCastException:
> org.apache.hive.com.esotericsoftware.kryo.Kryo cannot be cast to
> com.esotericsoftware.kryo.Kryo
> at
> org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.serializePlan(HiveShim.scala:178)
> at
> org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.writeExternal(HiveShim.scala:191)
> at
> java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
> Any thoughts or ideas would be appreciated!
>
> Regards,
>
> Soam
>


Kryo serialization mismatch in spark sql windowing function

2016-04-06 Thread Soam Acharya
Hi folks,

I have a build of Spark 1.6.1 on which spark sql seems to be functional
outside of windowing functions. For example, I can create a simple external
table via Hive:

CREATE EXTERNAL TABLE PSTable (pid int, tty string, time string, cmd string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/user/test/ps';

Ensure that the table is pointing to some valid data, set up spark sql to
point to the Hive metastore (we're running Hive 1.2.1) and run a basic test:

spark-sql> select * from PSTable;
7239pts/0   00:24:31java
9993pts/9   00:00:00ps
9994pts/9   00:00:00tail
9995pts/9   00:00:00sed
9996pts/9   00:00:00sed

But when I try to run a windowing function which I know runs onHive, I get:

spark-sql> select a.pid ,a.time, a.cmd, min(a.time) over (partition by
a.cmd order by a.time ) from PSTable a;
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
:
:
Caused by: java.lang.ClassCastException:
org.apache.hive.com.esotericsoftware.kryo.Kryo cannot be cast to
com.esotericsoftware.kryo.Kryo
at
org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.serializePlan(HiveShim.scala:178)
at
org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.writeExternal(HiveShim.scala:191)
at
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

Any thoughts or ideas would be appreciated!

Regards,

Soam


DataFrames - Kryo registration issue

2016-03-10 Thread Raghava Mutharaju
Hello All,

If Kryo serialization is enabled, doesn't Spark take care of registration
of built-in classes, i.e., are we not supposed to register just the custom
classes?

When using DataFrames, this does not seem to be the case. I had to register
the following classes

conf.registerKryoClasses(Array(classOf[org.apache.spark.sql.types.StructType],
classOf[org.apache.spark.sql.types.StructField],
classOf[Array[org.apache.spark.sql.types.StructField]],
classOf[org.apache.spark.sql.types.LongType$],
classOf[org.apache.spark.sql.types.Metadata],
classOf[scala.collection.immutable.Map$EmptyMap$],
classOf[org.apache.spark.sql.catalyst.InternalRow],
   classOf[Array[org.apache.spark.sql.catalyst.InternalRow]],
 classOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow],
 classOf[Array[org.apache.spark.sql.catalyst.expressions.UnsafeRow]],
 Class.forName("org.apache.spark.sql.execution.joins.UnsafeHashedRelation"),
Class.forName("java.util.HashMap"),
classOf[scala.reflect.ClassTag$$anon$1],
Class.forName("java.lang.Class"),
 Class.forName("org.apache.spark.sql.execution.columnar.CachedBatch")))

I got the following exception

com.esotericsoftware.kryo.KryoException:
java.lang.IllegalArgumentException: Class is not registered: byte[][]

But byte is not a class. So I couldn't register it -- compiler complains
that byte is not a class. How can I register byte[][] in Scala?

Does this point to some other issue?

In some other posts, I noticed use of kryo.register(). In this case, how do
we pass the kryo object to SparkContext?

Thanks in advance.

Regards,
Raghava.


Re: Spark Streaming 1.6 mapWithState not working well with Kryo Serialization

2016-03-02 Thread Shixiong(Ryan) Zhu
See https://issues.apache.org/jira/browse/SPARK-12591

After applying the patch, it should work. However, if you want to enable
"registrationRequired", you still need to register
"org.apache.spark.streaming.util.OpenHashMapBasedStateMap",
"org.apache.spark.streaming.util.EmptyStateMap" and
"org.apache.spark.streaming.rdd.MapWithStateRDDRecord" by yourself because
these classes are defined in the Streaming project and we don't want to use
them in Spark core.


On Wed, Mar 2, 2016 at 1:41 PM, Aris <arisofala...@gmail.com> wrote:

> Hello Spark folks and especially TD,
>
> I am using the Spark Streaming 1.6 mapWithState API, and I am trying to
> enforce Kryo Serialization with
>
> SparkConf.set("spark.kryo.registrationRequired", "true")
>
> However, this appears to be impossible! I registered all the classes that
> are my own, but I problem with a
> class org.apache.spark.streaming.rdd.MapWithStateRDDRecord, which is set as
> private[streaming] .
>
>
> The error:
>
> java.lang.IllegalArgumentException: Class is not registered:
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord
> Note: To register this class use:
> kryo.register(org.apache.spark.streaming.rdd.MapWithStateRDDRecord.class);
> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
>
> Since this class is private with spark streaming itself, I cannot actually
> register it with Kryo, and I cannot do registrationRequired in order to
> make sure *everything* has been serialized with Kryo.
>
> Is this a bug? Can I somehow solve this?
>
> Aris
>


Spark Streaming 1.6 mapWithState not working well with Kryo Serialization

2016-03-02 Thread Aris
Hello Spark folks and especially TD,

I am using the Spark Streaming 1.6 mapWithState API, and I am trying to
enforce Kryo Serialization with

SparkConf.set("spark.kryo.registrationRequired", "true")

However, this appears to be impossible! I registered all the classes that
are my own, but I problem with a
class org.apache.spark.streaming.rdd.MapWithStateRDDRecord, which is set as
private[streaming] .


The error:

java.lang.IllegalArgumentException: Class is not registered:
org.apache.spark.streaming.rdd.MapWithStateRDDRecord
Note: To register this class use:
kryo.register(org.apache.spark.streaming.rdd.MapWithStateRDDRecord.class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)

Since this class is private with spark streaming itself, I cannot actually
register it with Kryo, and I cannot do registrationRequired in order to
make sure *everything* has been serialized with Kryo.

Is this a bug? Can I somehow solve this?

Aris


Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:

2016-01-08 Thread Shixiong(Ryan) Zhu
Could you disable `spark.kryo.registrationRequired`? Some classes may not
be registered but they work well with Kryo's default serializer.

On Fri, Jan 8, 2016 at 8:58 AM, Ted Yu  wrote:

> bq. try adding scala.collection.mutable.WrappedArray
>
> But the hint said registering 
> scala.collection.mutable.WrappedArray$ofRef.class
> , right ?
>
> On Fri, Jan 8, 2016 at 8:52 AM, jiml  wrote:
>
>> (point of post is to see if anyone has ideas about errors at end of post)
>>
>> In addition, the real way to test if it's working is to force
>> serialization:
>>
>> In Java:
>>
>> Create array of all your classes:
>> // for kyro serializer it wants to register all classes that need to be
>> serialized
>> Class[] kryoClassArray = new Class[]{DropResult.class,
>> DropEvaluation.class,
>> PrintHetSharing.class};
>>
>> in the builder for your SparkConf (or in conf/spark-defaults.sh)
>> .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> //require registration of all classes with Kyro
>> .set("spark.kryo.registrationRequired", "true")
>> // don't forget to register ALL classes or will get error
>> .registerKryoClasses(kryoClassArray);
>>
>> Then you will start to get neat errors like the one I am working on:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due
>> to stage failure: Failed to serialize task 0, not attempting to retry it.
>> Exception during serialization: java.io.IOException:
>> java.lang.IllegalArgumentException: Class is not registered:
>> scala.collection.mutable.WrappedArray$ofRef
>> Note: To register this class use:
>> kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
>>
>> I did try adding scala.collection.mutable.WrappedArray to the Class array
>> up
>> top but no luck. Thanks
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/kryos-serializer-tp16454p25921.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:

2016-01-08 Thread jiml
(point of post is to see if anyone has ideas about errors at end of post)

In addition, the real way to test if it's working is to force serialization:

In Java:

Create array of all your classes:
// for kyro serializer it wants to register all classes that need to be
serialized
Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class,
PrintHetSharing.class};

in the builder for your SparkConf (or in conf/spark-defaults.sh)
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
//require registration of all classes with Kyro
.set("spark.kryo.registrationRequired", "true")
// don't forget to register ALL classes or will get error
.registerKryoClasses(kryoClassArray);

Then you will start to get neat errors like the one I am working on:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Failed to serialize task 0, not attempting to retry it.
Exception during serialization: java.io.IOException:
java.lang.IllegalArgumentException: Class is not registered:
scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use:
kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);

I did try adding scala.collection.mutable.WrappedArray to the Class array up
top but no luck. Thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/kryos-serializer-tp16454p25921.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:

2016-01-08 Thread Ted Yu
bq. try adding scala.collection.mutable.WrappedArray

But the hint said registering scala.collection.mutable.WrappedArray$ofRef.class
, right ?

On Fri, Jan 8, 2016 at 8:52 AM, jiml  wrote:

> (point of post is to see if anyone has ideas about errors at end of post)
>
> In addition, the real way to test if it's working is to force
> serialization:
>
> In Java:
>
> Create array of all your classes:
> // for kyro serializer it wants to register all classes that need to be
> serialized
> Class[] kryoClassArray = new Class[]{DropResult.class,
> DropEvaluation.class,
> PrintHetSharing.class};
>
> in the builder for your SparkConf (or in conf/spark-defaults.sh)
> .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> //require registration of all classes with Kyro
> .set("spark.kryo.registrationRequired", "true")
> // don't forget to register ALL classes or will get error
> .registerKryoClasses(kryoClassArray);
>
> Then you will start to get neat errors like the one I am working on:
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
> to stage failure: Failed to serialize task 0, not attempting to retry it.
> Exception during serialization: java.io.IOException:
> java.lang.IllegalArgumentException: Class is not registered:
> scala.collection.mutable.WrappedArray$ofRef
> Note: To register this class use:
> kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
>
> I did try adding scala.collection.mutable.WrappedArray to the Class array
> up
> top but no luck. Thanks
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/kryos-serializer-tp16454p25921.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Kryo serialization fails when using SparkSQL and HiveContext

2015-12-14 Thread Linh M. Tran
Hi everyone,
I'm using HiveContext and SparkSQL to query a Hive table and doing join
operation on it.
After changing the default serializer to Kryo with
spark.kryo.registrationRequired = true, the Spark application failed with
the following error:

java.lang.IllegalArgumentException: Class is not registered:
org.apache.spark.sql.catalyst.expressions.GenericRow
Note: To register this class use:
kryo.register(org.apache.spark.sql.catalyst.expressions.GenericRow.class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:124)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:204)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I'm using Spark 1.3.1 (HDP 2.3.0) and submitting Spark application to Yarn
in cluster mode.
Any help is appreciated.
-- 
Linh M. Tran


Re: Kryo serialization fails when using SparkSQL and HiveContext

2015-12-14 Thread Michael Armbrust
You'll need to either turn off registration
(spark.kryo.registrationRequired) or create a custom register
spark.kryo.registrator

http://spark.apache.org/docs/latest/configuration.html#compression-and-serialization

On Mon, Dec 14, 2015 at 2:17 AM, Linh M. Tran <linh.mtran...@gmail.com>
wrote:

> Hi everyone,
> I'm using HiveContext and SparkSQL to query a Hive table and doing join
> operation on it.
> After changing the default serializer to Kryo with
> spark.kryo.registrationRequired = true, the Spark application failed with
> the following error:
>
> java.lang.IllegalArgumentException: Class is not registered:
> org.apache.spark.sql.catalyst.expressions.GenericRow
> Note: To register this class use:
> kryo.register(org.apache.spark.sql.catalyst.expressions.GenericRow.class);
> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
> at
> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
> at
> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:124)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:204)
> at
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I'm using Spark 1.3.1 (HDP 2.3.0) and submitting Spark application to Yarn
> in cluster mode.
> Any help is appreciated.
> --
> Linh M. Tran
>


Re: Kryo Serialization in Spark

2015-12-10 Thread manasdebashiskar
Are you sure you are using Kryo serialization. 
You are getting a java serialization error.
Are you setting up your sparkcontext with kryo serialization enabled?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-Serialization-in-Spark-tp25628p25678.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



hbase Put object kryo serialisation error

2015-12-09 Thread Shushant Arora
Hi

I have a javapairrdd<Immutablebytewritable,Put> pairrdd.
when i do rdd.persist(StorageLevel.MEMORY_AND_DISK()).

It throws exception :

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:100

serialisationtrace:

familyMap(org.apache.hadoop.hbase.cleint.Put)

I have registerd Put,and TreeMap in spark kryo serialiser.

whats the issue of this error ?


Full stack trace is :


Exception in thread "main" com.esotericsoftware.kryo.KryoException:
Encountered unregistered class ID: 100
Serialization trace:
familyMap (org.apache.hadoop.hbase.client.Put)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)


Kryo Serialization in Spark

2015-12-07 Thread prasad223
Hi All,

I'm unable to use Kryo serializer in my Spark program.
I'm loading a graph from an edgelist file using GraphLoader and performing a
BFS using pregel API.
But I get the below mentioned error while I'm running.
Can anybody tell me what is the right way to serialize a class in Spark and
what are the functions that needs to be implemented.


class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[GraphBFS])
kryo.register(classOf[Config])
kryo.register(classOf[Iterator[(Long, Double)]])
  }
}


Class GraphBFS{

def vprog(id: VertexId, attr: Double, msg: Double): Double =
math.min(attr,msg) 

def sendMessage(triplet: EdgeTriplet[Double, Int]) :
Iterator[(VertexId, Double)] = {
var iter:Iterator[(VertexId, Double)] = Iterator.empty
val isSrcMarked = triplet.srcAttr != Double.PositiveInfinity
val isDstMarked = triplet.dstAttr != Double.PositiveInfinity
if(!(isSrcMarked && isDstMarked)){
if(isSrcMarked){
iter =
Iterator((triplet.dstId,triplet.srcAttr+1))
}else{
iter =
Iterator((triplet.srcId,triplet.dstAttr+1))
}
}
iter
}

def reduceMessage(a: Double, b: Double) = math.min(a,b)

 def main() {
..
  val bfs = initialGraph.pregel(initialMessage, maxIterations,
activeEdgeDirection)(vprog, sendMessage, reduceMessage)
.

 }
}



15/12/07 21:52:49 INFO BlockManager: Removing RDD 8
15/12/07 21:52:49 INFO BlockManager: Removing RDD 2
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:683)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:682)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:682)
at
org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:96)
at
org.apache.spark.graphx.impl.GraphImpl.mapVertices(GraphImpl.scala:132)
at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:122)
at org.apache.spark.graphx.GraphOps.pregel(GraphOps.scala:362)
at GraphBFS.main(GraphBFS.scala:241)
at run$.main(GraphBFS.scala:268)
at run.main(GraphBFS.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: GraphBFS
Serialization stack:
- object not serializable (class: GraphBFS, value:
GraphBFS@575c3e9b)
- field (class: GraphBFS$$anonfun$17, name: $outer, type: class
GraphBFS)
- object (class GraphBFS$$anonfun$17, )
- field (class: org.apache.spark.graphx.Pregel$$anonfun$1, name:
vprog$1, type: interface scala.Function3)
- object (class org.apache.spark.graphx.Pregel$$anonfun$1,
)
- field (class: org.apache.spark.graphx.impl.GraphImpl$$anonfun$5,
name: f$1, type: interface scala.Function2)
- object (class org.apache.spark.graphx.impl.GraphImpl$$anonfun$5,
)
- field (class:
org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$1, name: f$1, type:
interface scala.Function1)
- object (class
org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$1, )

Thanks,
Prasad




-
Thanks,
Prasad
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-Serialization-in-Spark-tp25628.html
Sent from the Apache Spark User List mailing list arc

ClassCastException in Kryo Serialization

2015-12-05 Thread SRK
Hi,

I seem to be getting class cast exception in Kryo Serialization. Following
is the error.  Child1 class is a map in parent class. Child1 has a hashSet
testObjects of the type  Object1. I get an error when it tries to
deserialize Object1. Any idea as to why this is happening?

com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException:
Object1 cannot be cast to java.lang.String
Serialization trace:
testObjects (Child1)
map (parent)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-in-Kryo-Serialization-tp25575.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why is Kryo not the default serializer?

2015-11-10 Thread ozawa_h
Array issue was also discussed in Apache Hive forum. This problem seems like it 
can be resolved by using Kryo 3.x. Will upgrading to Kryo 3.x allow Kryo to 
become the default SerDes?
https://issues.apache.org/jira/browse/HIVE-12174

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Why is Kryo not the default serializer?

2015-11-09 Thread Hitoshi Ozawa
If Kryo usage is recommended, why is Java serialization the default
serializer instead of Kryo? Is there some limitation to using Kryo? I've
read through the documentation but it just seem Kryo  is a better choice and
should be made a default.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-Kryo-not-the-default-serializer-tp25338.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why is Kryo not the default serializer?

2015-11-09 Thread Sabarish Sasidharan
I have seen some failures in our workloads with Kryo, one I remember is a
scenario with very large arrays. We could not get Kryo to work despite
using the different configuration properties. Switching to java serde was
what worked.

Regards
Sab

On Tue, Nov 10, 2015 at 11:43 AM, Hitoshi Ozawa <ozaw...@worksap.co.jp>
wrote:

> If Kryo usage is recommended, why is Java serialization the default
> serializer instead of Kryo? Is there some limitation to using Kryo? I've
> read through the documentation but it just seem Kryo  is a better choice
> and
> should be made a default.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-Kryo-not-the-default-serializer-tp25338.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Kryo makes String data invalid

2015-10-26 Thread Saif.A.Ellafi
Hi all,

I have a parquet file, which I am loading in a shell. When I launch the shell 
with -driver-java-options ="-Dspark.serializer=...kryo", makes a couple fields 
look like:

03-?? ??-?? ??-???
when calling > data.first

I will confirm briefly, but I am utterly sure it happens only on StringType 
fields.

Why could this be happening? perhaps when creating the parquet file from spark, 
Kryo wasn't set up?

If I disable Kryo, data looks good.

Any ideas?
Saif



Enabling kryo serialization slows down machine learning app.

2015-10-06 Thread fede.sc
Hi,
my team is setting up a machine-learning framework based on Spark's mlib,
that currently uses
logistic regression. I enabled Kryo serialization and enforced class
registration, so I know
that all the serialized classes are registered. However, the running times
when Kryo
serialization is enabled are consistently longer. This is true both when
running locally on
a smaller samples (1.6 minutes vs 1.3m) and also when running with a larger
sample on AWS with
two workers nodes (2h30 vs 1h50).

Using the monitoring tools suggests that Task Deserialization Times are
similar (although perhaps
slightly longer for Kryo), but Task Durations and even Scheduler Delays
increase significantly.

There is also a significant difference in memory usage: for Kryo the number
of stored RDDs is
higher (much more so on the local sample: 40 vs. 4).

Does anyone have an idea of what can be going on, or where should I focus to
find out?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Enabling-kryo-serialization-slows-down-machine-learning-app-tp24947.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-18 Thread Vipul Rai
Hi Nick/Igor,

​​
Any solution for this ?
Even I am having the same issue and copying jar to each executor is not
feasible if we use lot of jars.

Thanks,
Vipul


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Igor Berman
as a starting point, attach your stacktrace...
ps: look for duplicates in your classpath, maybe you include another jar
with same class

On 8 September 2015 at 06:38, Nicholas R. Peterson <nrpeter...@gmail.com>
wrote:

> I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through Yarn.
> Serialization is set to use Kryo.
>
> I have a large object which I send to the executors as a Broadcast. The
> object seems to serialize just fine. When it attempts to deserialize,
> though, Kryo throws a ClassNotFoundException... for a class that I include
> in the fat jar that I spark-submit.
>
> What could be causing this classpath issue with Kryo on the executors?
> Where should I even start looking to try to diagnose the problem? I
> appreciate any help you can provide.
>
> Thank you!
>
> -- Nick
>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Nicholas R. Peterson
Thans, Igor; I've got it running again right now, and can attach the stack
trace when it finishes.

In the mean time, I've noticed something interesting: in the Spark UI, the
application jar that I submit is not being included on the classpath.  It
has been successfully uploaded to the nodes -- in the nodemanager directory
for the application, I see __app__.jar and __spark__.jar.  The directory
itself is on the classpath, and __spark__.jar and __hadoop_conf__ are as
well.  When I do everything the same but switch the master to local[*], the
jar I submit IS added to the classpath.

This seems like a likely culprit.  What could cause this, and how can I fix
it?

Best,
Nick

On Tue, Sep 8, 2015 at 1:14 AM Igor Berman <igor.ber...@gmail.com> wrote:

> as a starting point, attach your stacktrace...
> ps: look for duplicates in your classpath, maybe you include another jar
> with same class
>
> On 8 September 2015 at 06:38, Nicholas R. Peterson <nrpeter...@gmail.com>
> wrote:
>
>> I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through Yarn.
>> Serialization is set to use Kryo.
>>
>> I have a large object which I send to the executors as a Broadcast. The
>> object seems to serialize just fine. When it attempts to deserialize,
>> though, Kryo throws a ClassNotFoundException... for a class that I include
>> in the fat jar that I spark-submit.
>>
>> What could be causing this classpath issue with Kryo on the executors?
>> Where should I even start looking to try to diagnose the problem? I
>> appreciate any help you can provide.
>>
>> Thank you!
>>
>> -- Nick
>>
>
>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Nick Peterson
Yes, the jar contains the class:

$ jar -tf lumiata-evaluation-assembly-1.0.jar | grep 2028/Document/Document
com/i2028/Document/Document$1.class
com/i2028/Document/Document.class

What else can I do?  Is there any way to get more information about the
classes available to the particular classloader kryo is using?

On Tue, Sep 8, 2015 at 6:34 AM Igor Berman <igor.ber...@gmail.com> wrote:

> java.lang.ClassNotFoundException: com.i2028.Document.Document
>
> 1. so have you checked that jar that you create(fat jar) contains this class?
>
> 2. might be there is some stale cache issue...not sure though
>
>
> On 8 September 2015 at 16:12, Nicholas R. Peterson <nrpeter...@gmail.com>
> wrote:
>
>> Here is the stack trace:  (Sorry for the duplicate, Igor -- I forgot to 
>> include the list.)
>>
>>
>> 15/09/08 05:56:43 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 
>> 41.0 (TID 193386, ds-compute2.lumiata.com): java.io.IOException: 
>> com.esotericsoftware.kryo.KryoException: Error constructing instance of 
>> class: com.lumiata.patientanalysis.utils.CachedGraph
>>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>>  at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>  at 
>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:44)
>>  at 
>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:43)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  at java.lang.Thread.run(Thread.java:745)
>> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing 
>> instance of class: com.lumiata.patientanalysis.utils.CachedGraph
>>  at 
>> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
>>  at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>>  at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>>  at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>  at 
>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
>>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>>  ... 24 more
>> Caused by: java.lang.reflect.InvocationTargetException
>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>  at 
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingCo

Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Nicholas R. Peterson
)
at 
com.lumiata.patientanalysis.utils.CachedGraph.(CachedGraph.java:178)
... 38 more
Caused by: java.lang.ClassNotFoundException: com.i2028.Document.Document
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 47 more



> On Tue, Sep 8, 2015 at 6:01 AM Igor Berman <igor.ber...@gmail.com> wrote:
>
>> I wouldn't build on this. local mode & yarn are different so that jars
>> you use in spark submit are handled differently
>>
>> On 8 September 2015 at 15:43, Nicholas R. Peterson <nrpeter...@gmail.com>
>> wrote:
>>
>>> Thans, Igor; I've got it running again right now, and can attach the
>>> stack trace when it finishes.
>>>
>>> In the mean time, I've noticed something interesting: in the Spark UI,
>>> the application jar that I submit is not being included on the classpath.
>>> It has been successfully uploaded to the nodes -- in the nodemanager
>>> directory for the application, I see __app__.jar and __spark__.jar.  The
>>> directory itself is on the classpath, and __spark__.jar and __hadoop_conf__
>>> are as well.  When I do everything the same but switch the master to
>>> local[*], the jar I submit IS added to the classpath.
>>>
>>> This seems like a likely culprit.  What could cause this, and how can I
>>> fix it?
>>>
>>> Best,
>>> Nick
>>>
>>> On Tue, Sep 8, 2015 at 1:14 AM Igor Berman <igor.ber...@gmail.com>
>>> wrote:
>>>
>>>> as a starting point, attach your stacktrace...
>>>> ps: look for duplicates in your classpath, maybe you include another
>>>> jar with same class
>>>>
>>>> On 8 September 2015 at 06:38, Nicholas R. Peterson <
>>>> nrpeter...@gmail.com> wrote:
>>>>
>>>>> I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through
>>>>> Yarn. Serialization is set to use Kryo.
>>>>>
>>>>> I have a large object which I send to the executors as a Broadcast.
>>>>> The object seems to serialize just fine. When it attempts to deserialize,
>>>>> though, Kryo throws a ClassNotFoundException... for a class that I include
>>>>> in the fat jar that I spark-submit.
>>>>>
>>>>> What could be causing this classpath issue with Kryo on the executors?
>>>>> Where should I even start looking to try to diagnose the problem? I
>>>>> appreciate any help you can provide.
>>>>>
>>>>> Thank you!
>>>>>
>>>>> -- Nick
>>>>>
>>>>
>>>>
>>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Igor Berman
olver.java:115)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
>   at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
>   at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
>   at 
> com.lumiata.patientanalysis.utils.CachedGraph.loadCacheFromSerializedData(CachedGraph.java:221)
>   at 
> com.lumiata.patientanalysis.utils.CachedGraph.(CachedGraph.java:182)
>   at 
> com.lumiata.patientanalysis.utils.CachedGraph.(CachedGraph.java:178)
>   ... 38 more
> Caused by: java.lang.ClassNotFoundException: com.i2028.Document.Document
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>   ... 47 more
>
>
>
>> On Tue, Sep 8, 2015 at 6:01 AM Igor Berman <igor.ber...@gmail.com> wrote:
>>
>>> I wouldn't build on this. local mode & yarn are different so that jars
>>> you use in spark submit are handled differently
>>>
>>> On 8 September 2015 at 15:43, Nicholas R. Peterson <nrpeter...@gmail.com
>>> > wrote:
>>>
>>>> Thans, Igor; I've got it running again right now, and can attach the
>>>> stack trace when it finishes.
>>>>
>>>> In the mean time, I've noticed something interesting: in the Spark UI,
>>>> the application jar that I submit is not being included on the classpath.
>>>> It has been successfully uploaded to the nodes -- in the nodemanager
>>>> directory for the application, I see __app__.jar and __spark__.jar.  The
>>>> directory itself is on the classpath, and __spark__.jar and __hadoop_conf__
>>>> are as well.  When I do everything the same but switch the master to
>>>> local[*], the jar I submit IS added to the classpath.
>>>>
>>>> This seems like a likely culprit.  What could cause this, and how can I
>>>> fix it?
>>>>
>>>> Best,
>>>> Nick
>>>>
>>>> On Tue, Sep 8, 2015 at 1:14 AM Igor Berman <igor.ber...@gmail.com>
>>>> wrote:
>>>>
>>>>> as a starting point, attach your stacktrace...
>>>>> ps: look for duplicates in your classpath, maybe you include another
>>>>> jar with same class
>>>>>
>>>>> On 8 September 2015 at 06:38, Nicholas R. Peterson <
>>>>> nrpeter...@gmail.com> wrote:
>>>>>
>>>>>> I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through
>>>>>> Yarn. Serialization is set to use Kryo.
>>>>>>
>>>>>> I have a large object which I send to the executors as a Broadcast.
>>>>>> The object seems to serialize just fine. When it attempts to deserialize,
>>>>>> though, Kryo throws a ClassNotFoundException... for a class that I 
>>>>>> include
>>>>>> in the fat jar that I spark-submit.
>>>>>>
>>>>>> What could be causing this classpath issue with Kryo on the
>>>>>> executors? Where should I even start looking to try to diagnose the
>>>>>> problem? I appreciate any help you can provide.
>>>>>>
>>>>>> Thank you!
>>>>>>
>>>>>> -- Nick
>>>>>>
>>>>>
>>>>>
>>>


  1   2   3   >