Broadcast value

2015-06-12 Thread Yasemin Kaya
Hi,

I am taking Broadcast value from file. I want to use it creating Rating
Object (ALS) .
But I am getting null. Here is my code
<https://gist.github.com/yaseminn/d6afd0263f6db6ea4ec5> :

At lines 17 & 18 is ok but 19 returns null so 21 returns me error. Why I
don't understand.Do you have any idea ?


Best,
yasemin



-- 
hiç ender hiç


question about Broadcast value NullPointerException

2016-08-23 Thread Chong Zhang
Hello,

I'm using Spark streaming to process kafka message, and wants to use a prop
file as the input and broadcast the properties:

val props = new Properties()
props.load(new FileInputStream(args(0)))
val sc = initSparkContext()
val propsBC = sc.broadcast(props)
println(s"propFileBC 1: " + propsBC.value)

val lines = createKStream(sc)
val parsedLines = lines.map (l => {
println(s"propFileBC 2: " + propsBC.value)
process(l, propsBC.value)
}).filter(...)

var goodLines = lines.window(2,2)
goodLines.print()


If I run it with spark-submit and master local[2], it works fine.
But if I used the --master spark://master:7077 (2 nodes), the 1st
propsBC.value is printed, but the 2nd print inside the map function causes
null pointer exception:

Caused by: java.lang.NullPointerException
at test.spark.Main$$anonfun$1.apply(Main.scala:79)
at test.spark.Main$$anonfun$1.apply(Main.scala:78)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:284)
at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

Appreciate any help,  thanks!


Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer

2015-04-06 Thread Shuai Zheng
Hi All,

 

I have tested my code without problem on EMR yarn (spark 1.3.0) with default
serializer (java).

But when I switch to org.apache.spark.serializer.KryoSerializer, the
broadcast value doesn't give me right result (actually return me empty
custom class on inner object).

 

Basically I broadcast a builder object, which carry an array of
propertiesUtils object. The code should not have any logical issue because
it works on default java serializer. But when I turn to the
org.apache.spark.serializer.KryoSerializer, it looks like the Array doesn't
initialize, propertiesList will give a right size, but then all element in
the array is just a normal empty PropertiesUtils.

 

Do I miss anything when I use this KryoSerializer? I just put the two lines,
do I need to implement some special code to enable KryoSerializer, but I
search all places but can't find any places mention it.

 

sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");

sparkConf.registerKryoClasses(new Class[]{ModelSessionBuilder.class,
Constants.class, PropertiesUtils.class, ModelSession.class});

 

public class ModelSessionBuilder implements Serializable {

/**

* 

 */

.

private PropertiesUtils[] propertiesList;

private static final long serialVersionUID =
-8139500301736028670L;

}

 

public class PropertiesUtils extends Properties {

   /**

   * 

*/

   private static final long serialVersionUID = -3684043338580885551L;

 

   public PropertiesUtils(Properties prop) {

  super(prop);

   }

 

   public PropertiesUtils() {

  // TODO Auto-generated constructor stub

   }

}



 

Regards,

 

Shuai



Spark Streaming: Refreshing broadcast value after each batch

2016-07-12 Thread Daniel Haviv
Hi,
I have a streaming application which uses a broadcast variable which I
populate from a database.
I would like every once in a while (or even every batch) to update/replace
the broadcast variable with the latest data from the database.

Only way I found online to do this is this "hackish" way (
http://stackoverflow.com/questions/28573816/periodic-broadcast-in-apache-spark-streaming)
which I'm not sure gets re-executed per batch anyway:

val broadcastFactory = new TorrentBroadcastFactory()
broadcastFactory.unbroadcast(BroadcastId, true, true)
// append some ids to initIds
val broadcastcontent =
broadcastFactory.newBroadcast[.Set[String]](initIds, false,
BroadcastId)


Is there a proper way to do that?

Thank you,
Daniel


RE: [BUG]Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer

2015-04-07 Thread Shuai Zheng
I have found the issue, but I think it is bug.

 

If I change my class to:

 

public class ModelSessionBuilder implements Serializable {

/**

* 

 */

.

private Properties[] propertiesList;

private static final long serialVersionUID =
-8139500301736028670L;

}

 

The broadcast value has no issue. But in my original form, if I broadcast it
as array of my custom subclass of Properties, after broadcast, the
propertiesList array will be an array of  empty PropertiesUtils objects
there (empty, not NULL), I am not sure why this happen (the code without any
problem when run with default java serializer). So I think this is a bug,
but I am not sure it is a bug of spark or a bug of Kryo.

 

Regards,

 

Shuai


 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Monday, April 06, 2015 5:34 PM
To: user@spark.apache.org
Subject: Broadcast value return empty after turn to
org.apache.spark.serializer.KryoSerializer

 

Hi All,

 

I have tested my code without problem on EMR yarn (spark 1.3.0) with default
serializer (java).

But when I switch to org.apache.spark.serializer.KryoSerializer, the
broadcast value doesn't give me right result (actually return me empty
custom class on inner object).

 

Basically I broadcast a builder object, which carry an array of
propertiesUtils object. The code should not have any logical issue because
it works on default java serializer. But when I turn to the
org.apache.spark.serializer.KryoSerializer, it looks like the Array doesn't
initialize, propertiesList will give a right size, but then all element in
the array is just a normal empty PropertiesUtils.

 

Do I miss anything when I use this KryoSerializer? I just put the two lines,
do I need to implement some special code to enable KryoSerializer, but I
search all places but can't find any places mention it.

 

sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");

sparkConf.registerKryoClasses(new Class[]{ModelSessionBuilder.class,
Constants.class, PropertiesUtils.class, ModelSession.class});

 

public class ModelSessionBuilder implements Serializable {

/**

* 

 */

.

private PropertiesUtils[] propertiesList;

private static final long serialVersionUID =
-8139500301736028670L;

}

 

public class PropertiesUtils extends Properties {

   /**

   * 

*/

   private static final long serialVersionUID = -3684043338580885551L;

 

   public PropertiesUtils(Properties prop) {

  super(prop);

   }

 

   public PropertiesUtils() {

  // TODO Auto-generated constructor stub

   }

}



 

Regards,

 

Shuai



Re: [BUG]Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer

2015-04-14 Thread Imran Rashid
HI Shuai,

I don't think this is a bug with kryo, its just a subtlety with the kryo
works.  I *think* that it would also work if you changed your
PropertiesUtil class to either (a) remove the no-arg constructor or (b)
instead of extending properties, you make it a contained member variable.
I wish I had a succinct explanation, but I think it really gets into the
nitty gritty details of how these serializer works (and this just a hunch
of mine anyway, I'm not 100% sure).  Would be great if you could confirm
either way.

thanks,
Imran

On Tue, Apr 7, 2015 at 9:29 AM, Shuai Zheng  wrote:

> I have found the issue, but I think it is bug.
>
>
>
> If I change my class to:
>
>
>
> public class ModelSessionBuilder implements Serializable {
>
> /**
>
> *
>
>  */
>
> …
>
> private *Properties[] propertiesList*;
>
> private static final long serialVersionUID =
> -8139500301736028670L;
>
> }
>
>
>
> The broadcast value has no issue. But in my original form, if I broadcast
> it as array of my custom subclass of Properties, after broadcast, the
> propertiesList array will be an array of  empty PropertiesUtils objects
> there (empty, not NULL), I am not sure why this happen (the code without
> any problem when run with default java serializer). So I think this is a
> bug, but I am not sure it is a bug of spark or a bug of Kryo.
>
>
>
> Regards,
>
>
>
> Shuai
>
>
>
>
> *From:* Shuai Zheng [mailto:szheng.c...@gmail.com]
> *Sent:* Monday, April 06, 2015 5:34 PM
> *To:* user@spark.apache.org
> *Subject:* Broadcast value return empty after turn to
> org.apache.spark.serializer.KryoSerializer
>
>
>
> Hi All,
>
>
>
> I have tested my code without problem on EMR yarn (spark 1.3.0) with
> default serializer (java).
>
> But when I switch to org.apache.spark.serializer.KryoSerializer, the
> broadcast value doesn’t give me right result (actually return me empty
> custom class on inner object).
>
>
>
> Basically I broadcast a builder object, which carry an array of
> propertiesUtils object. The code should not have any logical issue because
> it works on default java serializer. But when I turn to the
> org.apache.spark.serializer.KryoSerializer, it looks like the Array doesn’t
> initialize, propertiesList will give a right size, but then all element in
> the array is just a normal empty PropertiesUtils.
>
>
>
> Do I miss anything when I use this KryoSerializer? I just put the two
> lines, do I need to implement some special code to enable KryoSerializer,
> but I search all places but can’t find any places mention it.
>
>
>
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
>
> sparkConf.registerKryoClasses(*new* Class[]{ModelSessionBuilder.*class*,
> Constants.*class*, PropertiesUtils.*class*, ModelSession.*class*});
>
>
>
> public class ModelSessionBuilder implements Serializable {
>
> /**
>
> *
>
>  */
>
> …
>
> private PropertiesUtils[] propertiesList;
>
> private static final long serialVersionUID =
> -8139500301736028670L;
>
> }
>
>
>
> *public* *class* PropertiesUtils *extends* Properties {
>
>/**
>
>*
>
> */
>
>*private* *static* *final* *long* *serialVersionUID* =
> -3684043338580885551L;
>
>
>
>*public* PropertiesUtils(Properties prop) {
>
>   *super*(prop);
>
>}
>
>
>
>*public* PropertiesUtils() {
>
>   // *TODO* Auto-generated constructor stub
>
>}
>
> }
>
>
>
>
>
> Regards,
>
>
>
> Shuai
>


set spark.storage.memoryFraction to 0 when no cached RDD and memory area for broadcast value?

2015-04-07 Thread Shuai Zheng
Hi All,

 

I am a bit confused on spark.storage.memoryFraction, this is used to set the
area for RDD usage, will this RDD means only for cached and persisted RDD?
So if my program has no cached RDD at all (means that I have no .cache() or
.persist() call on any RDD), then I can set this
spark.storage.memoryFraction to a very small number or even zero?

 

I am writing a program which consume a lot of memory (broadcast value,
runtime, etc). But I have no cached RDD, so should I just turn off this
spark.storage.memoryFraction to 0 (which will help me to improve the
performance)?

 

And I have another issue on the broadcast, when I try to get a broadcast
value, it throws me out of memory error, which part of memory should I
allocate more (if I can't increase my overall memory size).

 

java.lang.OutOfMemoryError: Java heap spac

e

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA

rraySerializer.read(DefaultArraySerializers.java:218)

at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA

rraySerializer.read(DefaultArraySerializers.java:200)

at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea

d(FieldSerializer.java:611)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria

lizer.java:221)

at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)

at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea

d(FieldSerializer.java:605)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria

lizer.java:221)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at
org.apache.spark.serializer.KryoDeserializationStream.readObject(Kryo

Serializer.scala:138)

at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Ser

ializer.scala:133)

at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:2

48)

at
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:13

6)

at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:5

49)

at
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:431

)

at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlo

ck$1.apply(TorrentBroadcast.scala:167)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)

at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(Torren

tBroadcast.scala:164)

at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(Torrent

Broadcast.scala:64)

at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.s

cala:64)

at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast

.scala:87)

 

 

Regards,

 

Shuai



Re: set spark.storage.memoryFraction to 0 when no cached RDD and memory area for broadcast value?

2015-04-13 Thread Akhil Das
You could try leaving all the configuration values to default and running
your application and see if you are still hitting the heap issue, If so try
adding a Swap space to the machines which will definitely help. Another way
would be to set the heap space manually (export _JAVA_OPTIONS="-Xmx5g")

Thanks
Best Regards

On Wed, Apr 8, 2015 at 12:45 AM, Shuai Zheng  wrote:

> Hi All,
>
>
>
> I am a bit confused on spark.storage.memoryFraction, this is used to set
> the area for RDD usage, will this RDD means only for cached and persisted
> RDD? So if my program has no cached RDD at all (means that I have no
> .cache() or .persist() call on any RDD), then I can set this
> spark.storage.memoryFraction to a very small number or even zero?
>
>
>
> I am writing a program which consume a lot of memory (broadcast value,
> runtime, etc). But I have no cached RDD, so should I just turn off this
> spark.storage.memoryFraction to 0 (which will help me to improve the
> performance)?
>
>
>
> And I have another issue on the broadcast, when I try to get a broadcast
> value, it throws me out of memory error, which part of memory should I
> allocate more (if I can’t increase my overall memory size).
>
>
>
> java.lang.OutOfMemoryError: Java heap spac
>
> e
>
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA
>
> rraySerializer.read(DefaultArraySerializers.java:218)
>
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA
>
> rraySerializer.read(DefaultArraySerializers.java:200)
>
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea
>
> d(FieldSerializer.java:611)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria
>
> lizer.java:221)
>
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea
>
> d(FieldSerializer.java:605)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria
>
> lizer.java:221)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>
> at
> org.apache.spark.serializer.KryoDeserializationStream.readObject(Kryo
>
> Serializer.scala:138)
>
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Ser
>
> ializer.scala:133)
>
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>
> at
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:2
>
> 48)
>
> at
> org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:13
>
> 6)
>
> at
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:5
>
> 49)
>
> at
> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:431
>
> )
>
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlo
>
> ck$1.apply(TorrentBroadcast.scala:167)
>
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(Torren
>
> tBroadcast.scala:164)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(Torrent
>
> Broadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.s
>
> cala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast
>
> .scala:87)
>
>
>
>
>
> Regards,
>
>
>
> Shuai
>


Re: set spark.storage.memoryFraction to 0 when no cached RDD and memory area for broadcast value?

2015-04-14 Thread twinkle sachdeva
Hi,

In one of the application we have made which had no clone stuff, we have
set the value of spark.storage.memoryFraction to very low, and yes that
gave us performance benefits.

Regarding that issue, you should also look at the data you are trying to
broadcast, as sometimes creating that data structure at executor's itself
as singleton helps.

Thanks,


On Tue, Apr 14, 2015 at 12:23 PM, Akhil Das 
wrote:

> You could try leaving all the configuration values to default and running
> your application and see if you are still hitting the heap issue, If so try
> adding a Swap space to the machines which will definitely help. Another way
> would be to set the heap space manually (export _JAVA_OPTIONS="-Xmx5g")
>
> Thanks
> Best Regards
>
> On Wed, Apr 8, 2015 at 12:45 AM, Shuai Zheng 
> wrote:
>
>> Hi All,
>>
>>
>>
>> I am a bit confused on spark.storage.memoryFraction, this is used to set
>> the area for RDD usage, will this RDD means only for cached and persisted
>> RDD? So if my program has no cached RDD at all (means that I have no
>> .cache() or .persist() call on any RDD), then I can set this
>> spark.storage.memoryFraction to a very small number or even zero?
>>
>>
>>
>> I am writing a program which consume a lot of memory (broadcast value,
>> runtime, etc). But I have no cached RDD, so should I just turn off this
>> spark.storage.memoryFraction to 0 (which will help me to improve the
>> performance)?
>>
>>
>>
>> And I have another issue on the broadcast, when I try to get a broadcast
>> value, it throws me out of memory error, which part of memory should I
>> allocate more (if I can’t increase my overall memory size).
>>
>>
>>
>> java.lang.OutOfMemoryError: Java heap spac
>>
>> e
>>
>> at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA
>>
>> rraySerializer.read(DefaultArraySerializers.java:218)
>>
>> at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA
>>
>> rraySerializer.read(DefaultArraySerializers.java:200)
>>
>> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea
>>
>> d(FieldSerializer.java:611)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria
>>
>> lizer.java:221)
>>
>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea
>>
>> d(FieldSerializer.java:605)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria
>>
>> lizer.java:221)
>>
>> at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>
>> at
>> org.apache.spark.serializer.KryoDeserializationStream.readObject(Kryo
>>
>> Serializer.scala:138)
>>
>> at
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Ser
>>
>> ializer.scala:133)
>>
>> at
>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>
>> at
>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:2
>>
>> 48)
>>
>> at
>> org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:13
>>
>> 6)
>>
>> at
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:5
>>
>> 49)
>>
>> at
>> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:431
>>
>> )
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlo
>>
>> ck$1.apply(TorrentBroadcast.scala:167)
>>
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(Torren
>>
>> tBroadcast.scala:164)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(Torrent
>>
>> Broadcast.scala:64)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.s
>>
>> cala:64)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast
>>
>> .scala:87)
>>
>>
>>
>>
>>
>> Regards,
>>
>>
>>
>> Shuai
>>
>
>