Re: Broadcasting a set in PySpark

2014-07-18 Thread Josh Rosen
You have to use `myBroadcastVariable.value` to access the broadcasted
value; see
https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables


On Fri, Jul 18, 2014 at 2:56 PM, Vedant Dhandhania 
ved...@retentionscience.com wrote:

 Hi All,

 I am trying to broadcast a set in a PySpark script.

 I create the set like this:

 Uid_male_set = set(maleUsers.map(lambda x:x[1]).collect())


 Then execute this line:


 uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
 x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_set))


  An error occurred while calling o104.collectPartitions.

 : org.apache.spark.SparkException: Job aborted due to stage failure:
 Serialized task 1131:0 was 23503247 bytes which exceeds
 spark.akka.frameSize (10485760 bytes). Consider using broadcast variables
 for large values.



 So I tried broadcasting it:

 Uid_male_setbc = sc.broadcast(Uid_male_set)


  Uid_male_setbc

 pyspark.broadcast.Broadcast object at 0x1ba2ed0


 Then I execute it line:


 uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
 x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_setbc))

 ile stdin, line 1, in lambda

 TypeError: argument of type 'Broadcast' is not iterable

  [duplicate 1]


 So I am stuck either ways, the script runs locally well on a smaller
 dataset, but throws me this error. Could any one point out how to correct
 this or where I am going wrong?

 Thanks


 *Vedant Dhandhania*

 *Retention** Science*

 call: 805.574.0873

 visit: Site http://www.retentionscience.com/ | like: Facebook
 http://www.facebook.com/RetentionScience | follow: Twitter
 http://twitter.com/RetentionSci



Re: Broadcasting a set in PySpark

2014-07-18 Thread Vedant Dhandhania
Hi Josh,
I did make that change, however I get this error now:

568.492: [GC [PSYoungGen: 1412948K-207017K(1465088K)]
4494287K-3471149K(4960384K), 0.1280200 secs] [Times: user=0.23 sys=0.63,
real=0.13 secs]

568.642: [Full GCTraceback (most recent call last):

  File stdin, line 1, in module

  File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line
708, in count

return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()

  File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line
699, in sum

return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

  File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line
619, in reduce

vals = self.mapPartitions(func).collect()

  File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line
583, in collect

bytesInJava = self._jrdd.collect().iterator()

  File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line
94, in __exit__

self._context._jsc.setCallSite(None)

  File
/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 535, in __call__

  File
/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 361, in send_command

  File
/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 317, in _get_connection

  File
/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 324, in _create_connection

  File
/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 431, in start

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect
to the Java server


*Vedant Dhandhania*

*Retention** Science*

call: 805.574.0873

visit: Site http://www.retentionscience.com/ | like: Facebook
http://www.facebook.com/RetentionScience | follow: Twitter
http://twitter.com/RetentionSci


On Fri, Jul 18, 2014 at 3:10 PM, Josh Rosen rosenvi...@gmail.com wrote:

 You have to use `myBroadcastVariable.value` to access the broadcasted
 value; see
 https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables


 On Fri, Jul 18, 2014 at 2:56 PM, Vedant Dhandhania 
 ved...@retentionscience.com wrote:

 Hi All,

 I am trying to broadcast a set in a PySpark script.

 I create the set like this:

 Uid_male_set = set(maleUsers.map(lambda x:x[1]).collect())


 Then execute this line:


 uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
 x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_set))


  An error occurred while calling o104.collectPartitions.

 : org.apache.spark.SparkException: Job aborted due to stage failure:
 Serialized task 1131:0 was 23503247 bytes which exceeds
 spark.akka.frameSize (10485760 bytes). Consider using broadcast variables
 for large values.



 So I tried broadcasting it:

 Uid_male_setbc = sc.broadcast(Uid_male_set)


  Uid_male_setbc

 pyspark.broadcast.Broadcast object at 0x1ba2ed0


 Then I execute it line:


 uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
 x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_setbc))

 ile stdin, line 1, in lambda

 TypeError: argument of type 'Broadcast' is not iterable

  [duplicate 1]


 So I am stuck either ways, the script runs locally well on a smaller
 dataset, but throws me this error. Could any one point out how to correct
 this or where I am going wrong?

 Thanks


 *Vedant Dhandhania*

 *Retention** Science*

 call: 805.574.0873

 visit: Site http://www.retentionscience.com/ | like: Facebook
 http://www.facebook.com/RetentionScience | follow: Twitter
 http://twitter.com/RetentionSci