Spark Twitter streaming

2016-03-07 Thread Soni spark
Hallo friends,

I need a urgent help.

I am using spark streaming to get the tweets from twitter and loading the
data into HDFS. I want to find out the tweet source whether it is from web
or mobile web or facebook ..etc.  could you please help me logic.

Thanks
Soniya


Spark Partitioner vs Spark Shuffle Manager

2016-03-07 Thread Prabhu Joseph
Hi All,

   What is the difference between Spark Partitioner and Spark Shuffle
Manager. Spark Partitioner is by default Hash partitioner and Spark shuffle
manager is sort based, others are Hash, Tunsten Sort.

Thanks,
Prabhu Joseph


Re: How to compile Spark with private build of Hadoop

2016-03-07 Thread fightf...@163.com
I think you can establish your own maven repository and deploy your modified 
hadoop binary jar 
with your modified version number. Then you can add your repository in spark 
pom.xml and use 
mvn -Dhadoop.version=



fightf...@163.com
 
From: Lu, Yingqi
Date: 2016-03-08 15:09
To: user@spark.apache.org
Subject: How to compile Spark with private build of Hadoop
Hi All,
 
I am new to Spark and I have a question regarding to compile Spark. I modified 
trunk version of Hadoop source code. How can I compile Spark (standalone mode) 
with my modified version of Hadoop (HDFS, Hadoop-common and etc.)?
 
Thanks a lot for your help!
 
Thanks,
Lucy
 
 
 
 


How to compile Spark with private build of Hadoop

2016-03-07 Thread Lu, Yingqi
Hi All,

I am new to Spark and I have a question regarding to compile Spark. I modified 
trunk version of Hadoop source code. How can I compile Spark (standalone mode) 
with my modified version of Hadoop (HDFS, Hadoop-common and etc.)?

Thanks a lot for your help!

Thanks,
Lucy






overwriting a spark output using pyspark

2016-03-07 Thread Devesh Raj Singh
I am trying to overwrite a spark dataframe using the following option but I
am not successful

spark_df.write.format('com.databricks.spark.csv').option("header",
"true",mode='overwrite').save(self.output_file_path)

the mode=overwrite command is not successful
-- 
Warm regards,
Devesh.


Re: OOM exception during Broadcast

2016-03-07 Thread Arash
The driver memory is set at 40G and OOM seems to be happening on the
executors. I might try a different broadcast block size (vs 4m) as Takeshi
suggested to see if it makes a difference.

On Mon, Mar 7, 2016 at 6:54 PM, Tristan Nixon  wrote:

> Yeah, the spark engine is pretty clever and its best not to pre-maturely
> optimize. It would be interesting to profile your join vs. the collect on
> the smaller dataset. I suspect that the join is faster (even before you
> broadcast it back out).
>
> I’m also curious about the broadcast OOM - did you try expanding the
> driver memory?
>
> On Mar 7, 2016, at 8:28 PM, Arash  wrote:
>
> So I just implemented the logic through a standard join (without collect
> and broadcast) and it's working great.
>
> The idea behind trying the broadcast was that since the other side of join
> is a much larger dataset, the process might be faster through collect and
> broadcast, since it avoids the shuffle of the bigger dataset.
>
> I think the join is working much better in this case so I'll probably just
> use that, still a bit curious as why the error is happening.
>
> On Mon, Mar 7, 2016 at 5:55 PM, Tristan Nixon 
> wrote:
>
>> I’m not sure I understand - if it was already distributed over the
>> cluster in an RDD, why would you want to collect and then re-send it as a
>> broadcast variable? Why not simply use the RDD that is already distributed
>> on the worker nodes?
>>
>> On Mar 7, 2016, at 7:44 PM, Arash  wrote:
>>
>> Hi Tristan,
>>
>> This is not static, I actually collect it from an RDD to the driver.
>>
>> On Mon, Mar 7, 2016 at 5:42 PM, Tristan Nixon 
>> wrote:
>>
>>> Hi Arash,
>>>
>>> is this static data?  Have you considered including it in your jars and
>>> de-serializing it from jar on each worker node?
>>> It’s not pretty, but it’s a workaround for serialization troubles.
>>>
>>> On Mar 7, 2016, at 5:29 PM, Arash  wrote:
>>>
>>> Hello all,
>>>
>>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
>>> but haven't been able to make it work so far.
>>>
>>> It looks like the executors start to run out of memory during
>>> deserialization. This behavior only shows itself when the number of
>>> partitions is above a few 10s, the broadcast does work for 10 or 20
>>> partitions.
>>>
>>> I'm using the following setup to observe the problem:
>>>
>>> val tuples: Array[((String, String), (String, String))]  // ~ 10M
>>> tuples
>>> val tuplesBc = sc.broadcast(tuples)
>>> val numsRdd = sc.parallelize(1 to 5000, 100)
>>> numsRdd.map(n => tuplesBc.value.head).count()
>>>
>>> If I set the number of partitions for numsRDD to 20, the count goes
>>> through successfully, but at 100, I'll start to get errors such as:
>>>
>>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
>>> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
>>> space
>>> at
>>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>>> at
>>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>> at
>>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> 

Re: Spark streaming from Kafka best fit

2016-03-07 Thread pratik khadloya
Would using mapPartitions instead of map help here?

~Pratik

On Tue, Mar 1, 2016 at 10:07 AM Cody Koeninger  wrote:

> You don't need an equal number of executor cores to partitions.  An
> executor can and will work on multiple partitions within a batch, one after
> the other.  The real issue is whether you are able to keep your processing
> time under your batch time, so that delay doesn't increase.
>
> On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar 
> wrote:
>
>> Thanks Cody!
>>
>> I understand what you said and if I am correct it will be using 224
>> executor cores just for fetching + stage-1 processing of 224 partitions. I
>> will obviously need more cores for processing further stages and fetching
>> next batch.
>>
>> I will start with higher number of executor cores and see how it goes.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger 
>> wrote:
>>
>>> > "How do I keep a balance of executors which receive data from Kafka
>>> and which process data"
>>>
>>> I think you're misunderstanding how the direct stream works.  The
>>> executor which receives data is also the executor which processes data,
>>> there aren't separate receivers.  If it's a single stage worth of work
>>> (e.g. straight map / filter), the processing of a given partition is going
>>> to be done by the executor that read it from kafka.  If you do something
>>> involving a shuffle (e.g. reduceByKey), other executors will do additional
>>> processing.  The question of which executor works on which tasks is up to
>>> the scheduler (and getPreferredLocations, which only matters if you're
>>> running spark on the same nodes as kafka)
>>>
>>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
>>> jku...@rocketfuelinc.com.invalid> wrote:
>>>
 Hello all,

 I see that there are as of today 3 ways one can read from Kafka in
 spark streaming:
 1. KafkaUtils.createStream() (here
 )
 2. KafkaUtils.createDirectStream() (here
 )
 3. Kafka-spark-consumer (here
 )

 My spark streaming application has to read from 1 kafka topic with
 around 224 partitions, consuming data at around 150MB/s (~90,000
 messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after
 filtering. After filtering I need to maintain top 1 URL counts. I don't
 really care about exactly once semantics as I am interested in rough
 estimate.

 Code:

 sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
 sparkConf.setAppName("KafkaReader")
 val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
 createStreamingContext)

 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
 val kafkaParams = Map[String, String](
   "metadata.broker.list" -> "kafka.server.ip:9092",
   "group.id" -> consumer_group
 )

 val lineStreams = (1 to N).map{ _ =>
   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
 ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
 }

 ssc.union(
   lineStreams.map(stream => {
   stream.map(ParseStringToLogRecord)
 .filter(record => isGoodRecord(record))
 .map(record => record.url)
   })
 ).window(Seconds(120), Seconds(120))  // 2 Minute window
   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
 moving window, 28 will probably help in parallelism
   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
   .mapPartitions(iter => {
 iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
 1000).iterator
   }, true)
   .foreachRDD((latestRDD, rddTime) => {
   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
 record._1)).sortByKey(false).take(1000))
   })

 ssc.start()
 ssc.awaitTermination()

 Questions:

 a) I used #2 but I found that I couldn't control how many executors
 will be actually fetching from Kafka. How do I keep a balance of executors
 which receive data from Kafka and which process data? Do they keep changing
 for every batch?

 b) Now I am trying to use #1 creating multiple DStreams, filtering them
 and then doing a union. I don't understand why would the number of events
 processed per 120 seconds batch will change drastically. PFA the events/sec
 graph while running with 1 receiver. How to debug this?

 c) What will be the most suitable method to integrate with Kafka from
 above 3? Any recommendations for getting maximum performance, running the
 streaming application reliably in production 

Re: Does anyone implement org.apache.spark.serializer.Serializer in their own code?

2016-03-07 Thread Ted Yu
Josh:
SerializerInstance and SerializationStream would also become private[spark],
right ?

Thanks

On Mon, Mar 7, 2016 at 6:57 PM, Josh Rosen  wrote:

> Does anyone implement Spark's serializer interface
> (org.apache.spark.serializer.Serializer) in your own third-party code? If
> so, please let me know because I'd like to change this interface from a
> DeveloperAPI to private[spark] in Spark 2.0 in order to do some cleanup and
> refactoring. I think that the only reason it was a DeveloperAPI was Shark,
> but I'd like to confirm this by asking the community.
>
> Thanks,
> Josh
>


Re: Does anyone implement org.apache.spark.serializer.Serializer in their own code?

2016-03-07 Thread Koert Kuipers
we are not, but it seems reasonable to me that a user has the ability to
implement their own serializer.

can you refactor and break compatibility, but not make it private?

On Mon, Mar 7, 2016 at 9:57 PM, Josh Rosen  wrote:

> Does anyone implement Spark's serializer interface
> (org.apache.spark.serializer.Serializer) in your own third-party code? If
> so, please let me know because I'd like to change this interface from a
> DeveloperAPI to private[spark] in Spark 2.0 in order to do some cleanup and
> refactoring. I think that the only reason it was a DeveloperAPI was Shark,
> but I'd like to confirm this by asking the community.
>
> Thanks,
> Josh
>


Does anyone implement org.apache.spark.serializer.Serializer in their own code?

2016-03-07 Thread Josh Rosen
Does anyone implement Spark's serializer interface
(org.apache.spark.serializer.Serializer) in your own third-party code? If
so, please let me know because I'd like to change this interface from a
DeveloperAPI to private[spark] in Spark 2.0 in order to do some cleanup and
refactoring. I think that the only reason it was a DeveloperAPI was Shark,
but I'd like to confirm this by asking the community.

Thanks,
Josh


Re: OOM exception during Broadcast

2016-03-07 Thread Tristan Nixon
Yeah, the spark engine is pretty clever and its best not to pre-maturely 
optimize. It would be interesting to profile your join vs. the collect on the 
smaller dataset. I suspect that the join is faster (even before you broadcast 
it back out).

I’m also curious about the broadcast OOM - did you try expanding the driver 
memory?

> On Mar 7, 2016, at 8:28 PM, Arash  wrote:
> 
> So I just implemented the logic through a standard join (without collect and 
> broadcast) and it's working great.
> 
> The idea behind trying the broadcast was that since the other side of join is 
> a much larger dataset, the process might be faster through collect and 
> broadcast, since it avoids the shuffle of the bigger dataset. 
> 
> I think the join is working much better in this case so I'll probably just 
> use that, still a bit curious as why the error is happening.
> 
> On Mon, Mar 7, 2016 at 5:55 PM, Tristan Nixon  > wrote:
> I’m not sure I understand - if it was already distributed over the cluster in 
> an RDD, why would you want to collect and then re-send it as a broadcast 
> variable? Why not simply use the RDD that is already distributed on the 
> worker nodes?
> 
>> On Mar 7, 2016, at 7:44 PM, Arash > > wrote:
>> 
>> Hi Tristan, 
>> 
>> This is not static, I actually collect it from an RDD to the driver. 
>> 
>> On Mon, Mar 7, 2016 at 5:42 PM, Tristan Nixon > > wrote:
>> Hi Arash,
>> 
>> is this static data?  Have you considered including it in your jars and 
>> de-serializing it from jar on each worker node?
>> It’s not pretty, but it’s a workaround for serialization troubles.
>> 
>>> On Mar 7, 2016, at 5:29 PM, Arash >> > wrote:
>>> 
>>> Hello all,
>>> 
>>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes but 
>>> haven't been able to make it work so far.
>>> 
>>> It looks like the executors start to run out of memory during 
>>> deserialization. This behavior only shows itself when the number of 
>>> partitions is above a few 10s, the broadcast does work for 10 or 20 
>>> partitions. 
>>> 
>>> I'm using the following setup to observe the problem:
>>> 
>>> val tuples: Array[((String, String), (String, String))]  // ~ 10M tuples
>>> val tuplesBc = sc.broadcast(tuples)
>>> val numsRdd = sc.parallelize(1 to 5000, 100)
>>> numsRdd.map(n => tuplesBc.value.head).count()
>>> 
>>> If I set the number of partitions for numsRDD to 20, the count goes through 
>>> successfully, but at 100, I'll start to get errors such as:
>>> 
>>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage 
>>> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap 
>>> space
>>> at 
>>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>>> at 
>>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>>> at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>>> at 
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>> at 
>>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at 
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>> at 
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>>> at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at 
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at 
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at 
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at 
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at 
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at 
>>> 

Re: OOM exception during Broadcast

2016-03-07 Thread Arash
So I just implemented the logic through a standard join (without collect
and broadcast) and it's working great.

The idea behind trying the broadcast was that since the other side of join
is a much larger dataset, the process might be faster through collect and
broadcast, since it avoids the shuffle of the bigger dataset.

I think the join is working much better in this case so I'll probably just
use that, still a bit curious as why the error is happening.

On Mon, Mar 7, 2016 at 5:55 PM, Tristan Nixon  wrote:

> I’m not sure I understand - if it was already distributed over the cluster
> in an RDD, why would you want to collect and then re-send it as a broadcast
> variable? Why not simply use the RDD that is already distributed on the
> worker nodes?
>
> On Mar 7, 2016, at 7:44 PM, Arash  wrote:
>
> Hi Tristan,
>
> This is not static, I actually collect it from an RDD to the driver.
>
> On Mon, Mar 7, 2016 at 5:42 PM, Tristan Nixon 
> wrote:
>
>> Hi Arash,
>>
>> is this static data?  Have you considered including it in your jars and
>> de-serializing it from jar on each worker node?
>> It’s not pretty, but it’s a workaround for serialization troubles.
>>
>> On Mar 7, 2016, at 5:29 PM, Arash  wrote:
>>
>> Hello all,
>>
>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
>> but haven't been able to make it work so far.
>>
>> It looks like the executors start to run out of memory during
>> deserialization. This behavior only shows itself when the number of
>> partitions is above a few 10s, the broadcast does work for 10 or 20
>> partitions.
>>
>> I'm using the following setup to observe the problem:
>>
>> val tuples: Array[((String, String), (String, String))]  // ~ 10M
>> tuples
>> val tuplesBc = sc.broadcast(tuples)
>> val numsRdd = sc.parallelize(1 to 5000, 100)
>> numsRdd.map(n => tuplesBc.value.head).count()
>>
>> If I set the number of partitions for numsRDD to 20, the count goes
>> through successfully, but at 100, I'll start to get errors such as:
>>
>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
>> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
>> space
>> at
>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>> at
>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at

Re: OOM exception during Broadcast

2016-03-07 Thread Tristan Nixon
I’m not sure I understand - if it was already distributed over the cluster in 
an RDD, why would you want to collect and then re-send it as a broadcast 
variable? Why not simply use the RDD that is already distributed on the worker 
nodes?

> On Mar 7, 2016, at 7:44 PM, Arash  wrote:
> 
> Hi Tristan, 
> 
> This is not static, I actually collect it from an RDD to the driver. 
> 
> On Mon, Mar 7, 2016 at 5:42 PM, Tristan Nixon  > wrote:
> Hi Arash,
> 
> is this static data?  Have you considered including it in your jars and 
> de-serializing it from jar on each worker node?
> It’s not pretty, but it’s a workaround for serialization troubles.
> 
>> On Mar 7, 2016, at 5:29 PM, Arash > > wrote:
>> 
>> Hello all,
>> 
>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes but 
>> haven't been able to make it work so far.
>> 
>> It looks like the executors start to run out of memory during 
>> deserialization. This behavior only shows itself when the number of 
>> partitions is above a few 10s, the broadcast does work for 10 or 20 
>> partitions. 
>> 
>> I'm using the following setup to observe the problem:
>> 
>> val tuples: Array[((String, String), (String, String))]  // ~ 10M tuples
>> val tuplesBc = sc.broadcast(tuples)
>> val numsRdd = sc.parallelize(1 to 5000, 100)
>> numsRdd.map(n => tuplesBc.value.head).count()
>> 
>> If I set the number of partitions for numsRDD to 20, the count goes through 
>> successfully, but at 100, I'll start to get errors such as:
>> 
>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage 1.0 
>> (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap space
>> at 
>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>> at 
>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>> at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at 
>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at 
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>> at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at 
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> 
>> 
>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark 
>> property maximizeResourceAllocation is set to true (executor.memory = 48G 
>> according to spark ui environment). We're also using kryo serialization and 
>> Yarn is the resource manager.
>> 
>> Any ideas as what might be going wrong and how to debug this?
>> 
>> Thanks,
>> Arash
>> 
> 
> 



Re: OOM exception during Broadcast

2016-03-07 Thread Takeshi Yamamuro
Oh,  How about increasing broadcast block size
in spark.broadcast.blockSize?
A default size is `4m` and it is too small agains ~1GB, I think.

On Tue, Mar 8, 2016 at 10:44 AM, Arash  wrote:

> Hi Tristan,
>
> This is not static, I actually collect it from an RDD to the driver.
>
> On Mon, Mar 7, 2016 at 5:42 PM, Tristan Nixon 
> wrote:
>
>> Hi Arash,
>>
>> is this static data?  Have you considered including it in your jars and
>> de-serializing it from jar on each worker node?
>> It’s not pretty, but it’s a workaround for serialization troubles.
>>
>> On Mar 7, 2016, at 5:29 PM, Arash  wrote:
>>
>> Hello all,
>>
>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
>> but haven't been able to make it work so far.
>>
>> It looks like the executors start to run out of memory during
>> deserialization. This behavior only shows itself when the number of
>> partitions is above a few 10s, the broadcast does work for 10 or 20
>> partitions.
>>
>> I'm using the following setup to observe the problem:
>>
>> val tuples: Array[((String, String), (String, String))]  // ~ 10M
>> tuples
>> val tuplesBc = sc.broadcast(tuples)
>> val numsRdd = sc.parallelize(1 to 5000, 100)
>> numsRdd.map(n => tuplesBc.value.head).count()
>>
>> If I set the number of partitions for numsRDD to 20, the count goes
>> through successfully, but at 100, I'll start to get errors such as:
>>
>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
>> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
>> space
>> at
>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>> at
>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>
>>
>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
>> property maximizeResourceAllocation is set to true (executor.memory = 48G
>> according to spark ui environment). We're also using kryo serialization and
>> Yarn is the resource manager.
>>
>> Any ideas as what might be going wrong and how to debug this?
>>
>> Thanks,
>> Arash
>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: OOM exception during Broadcast

2016-03-07 Thread Arash
Hi Tristan,

This is not static, I actually collect it from an RDD to the driver.

On Mon, Mar 7, 2016 at 5:42 PM, Tristan Nixon  wrote:

> Hi Arash,
>
> is this static data?  Have you considered including it in your jars and
> de-serializing it from jar on each worker node?
> It’s not pretty, but it’s a workaround for serialization troubles.
>
> On Mar 7, 2016, at 5:29 PM, Arash  wrote:
>
> Hello all,
>
> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
> but haven't been able to make it work so far.
>
> It looks like the executors start to run out of memory during
> deserialization. This behavior only shows itself when the number of
> partitions is above a few 10s, the broadcast does work for 10 or 20
> partitions.
>
> I'm using the following setup to observe the problem:
>
> val tuples: Array[((String, String), (String, String))]  // ~ 10M
> tuples
> val tuplesBc = sc.broadcast(tuples)
> val numsRdd = sc.parallelize(1 to 5000, 100)
> numsRdd.map(n => tuplesBc.value.head).count()
>
> If I set the number of partitions for numsRDD to 20, the count goes
> through successfully, but at 100, I'll start to get errors such as:
>
> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
> space
> at
> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
> at
> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>
>
> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
> property maximizeResourceAllocation is set to true (executor.memory = 48G
> according to spark ui environment). We're also using kryo serialization and
> Yarn is the resource manager.
>
> Any ideas as what might be going wrong and how to debug this?
>
> Thanks,
> Arash
>
>
>


Re: Adding hive context gives error

2016-03-07 Thread Suniti Singh
Thanks Mich and Kabeer for quick reply.

@ Kabeer - i removed the spark - sql dependency and all the errors are
gone. But i am surprised to see this behaviour. Why spark-sql lib are an
issue for including the hive context?

Regards,
Suniti

On Mon, Mar 7, 2016 at 4:34 PM, Kabeer Ahmed 
wrote:

> I use SBT and I have never included spark-sql. The simple 2 lines in SBT
> are as below:
>
>
> libraryDependencies ++= Seq(  "org.apache.spark" %% "spark-core" % "1.5.0",  
> "org.apache.spark" %% "spark-hive" % "1.5.0")
>
>
>
> However, I do note that you are using Spark-sql include and the Spark
> version you use is 1.6.0. Can you please try with 1.5.0 to see if it works?
> I havent yet tried Spark 1.6.0.
>
>
> On 08/03/16 00:15, Suniti Singh wrote:
>
> Hi All,
>
> I am trying to create a hive context in a scala prog as follows in eclipse:
> Note --  i have added the maven dependency for spark -core , hive , and
> sql.
>
> import org.apache.spark.SparkConf
>
> import org.apache.spark.SparkContext
>
> import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
>
> object DataExp {
>
>def main(args: Array[String]) = {
>
>   val conf = new SparkConf().setAppName("DataExp").setMaster("local")
>
>   val sc = new SparkContext(conf)
>
>  * val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)*
>
>
>  }
>
> }
>
> I get the the following *errors*: @ line of hiveContext above in the prog
>
> 1 --- Error in Scala compiler: bad symbolic reference. A signature in
> HiveContext.class refers to term ui in package
> org.apache.spark.sql.execution which is not available. It may be completely
> missing from the current classpath, or the version on the classpath might
> be incompatible with the version used when compiling HiveContext.class.
> spark Unknown Scala Problem
> 2 --- SBT builder crashed while compiling. The error message is 'bad
> symbolic reference. A signature in HiveContext.class refers to term ui in
> package org.apache.spark.sql.execution which is not available. It may be
> completely missing from the current classpath, or the version on the
> classpath might be incompatible with the version used when compiling
> HiveContext.class.'. Check Error Log for details. spark Unknown Scala
> Problem
>
> 3 --- while compiling:
> /Users/sunitisingh/sparktest/spark/src/main/scala/com/sparktest/spark/DataExp.scala
> during phase: erasure  library version: version 2.10.6
> compiler version: version 2.10.6   reconstructed args: -javabootclasspath
> /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.
> jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/System/Library/Java/Extensions/AppleScriptEngine.jar:/System/Library/Java/Extensions/dns_sd.jar:/System/Library/Java/Extensions/j3daudio.jar:/System/Library/Java/Extensions/j3dcore.jar:/System/Library/Java/Extensions/j3dutils.jar:/System/Library/Java/Extensions/jai_codec.jar:/System/Library/Java/Extensions/jai_core.jar:/System/Library/Java/Extensions/mlibwrapper_jai.jar:/System/Library/Java/Extensions/MRJToolkit.jar:/System/Library/Java/Extensions/vecmath.jar
> -classpath
> 

Re: Adding hive context gives error

2016-03-07 Thread Kabeer Ahmed
I use SBT and I have never included spark-sql. The simple 2 lines in SBT are as 
below:




libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.5.0",
  "org.apache.spark" %% "spark-hive" % "1.5.0"
)



However, I do note that you are using Spark-sql include and the Spark version 
you use is 1.6.0. Can you please try with 1.5.0 to see if it works? I havent 
yet tried Spark 1.6.0.


On 08/03/16 00:15, Suniti Singh wrote:
Hi All,

I am trying to create a hive context in a scala prog as follows in eclipse:
Note --  i have added the maven dependency for spark -core , hive , and sql.


import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object DataExp {

   def main(args: Array[String]) = {

  val conf = new SparkConf().setAppName("DataExp").setMaster("local")

  val sc = new SparkContext(conf)

  val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

 }

}

I get the the following errors: @ line of hiveContext above in the prog

1 --- Error in Scala compiler: bad symbolic reference. A signature in 
HiveContext.class refers to term ui in package org.apache.spark.sql.execution 
which is not available. It may be completely missing from the current 
classpath, or the version on the classpath might be incompatible with the 
version used when compiling HiveContext.class. spark Unknown Scala Problem

2 --- SBT builder crashed while compiling. The error message is 'bad symbolic 
reference. A signature in HiveContext.class refers to term ui in package 
org.apache.spark.sql.execution which is not available. It may be completely 
missing from the current classpath, or the version on the classpath might be 
incompatible with the version used when compiling HiveContext.class.'. Check 
Error Log for details. spark Unknown Scala Problem

3 --- while compiling: 
/Users/sunitisingh/sparktest/spark/src/main/scala/com/sparktest/spark/DataExp.scala
 during phase: erasure  library version: version 2.10.6 
compiler version: version 2.10.6   reconstructed args: -javabootclasspath 
/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.
 
jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/System/Library/Java/Extensions/AppleScriptEngine.jar:/System/Library/Java/Extensions/dns_sd.jar:/System/Library/Java/Extensions/j3daudio.jar:/System/Library/Java/Extensions/j3dcore.jar:/System/Library/Java/Extensions/j3dutils.jar:/System/Library/Java/Extensions/jai_codec.jar:/System/Library/Java/Extensions/jai_core.jar:/System/Library/Java/Extensions/mlibwrapper_jai.jar:/System/Library/Java/Extensions/MRJToolkit.jar:/System/Library/Java/Extensions/vecmath.jar
 -classpath 
/Users/sunitisingh/sparktest/spark/target/classes:/Users/sunitisingh/sparktest/spark/target/test-classes:/Users/sunitisingh/.m2/repository/org/apache/spark/spark-core_2.10/1.6.0/spark-core_2.10-1.6.0.jar:/Users/sunitisingh/.m2/repository/org/apache/avro/avro-mapred/1.7.7/avro-mapred-1.7.7-hadoop2.jar:/Users/sunitisingh/.m2/repository/org/apache/avro/avro-ipc/1.7.7/avro-ipc-1.7.7.jar:/Users/sunitisingh/.m2/repository/org/apache/avro/avro-ipc/1.7.7/avro-ipc-1.7.7-tests.jar:/Users/sunitisingh/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar:/Users/sunitisingh/.m2/repository/com/twitter/chill_2.10/0.5.0/chill_2.10-0.5.0.jar:/Users/sunitisingh/.m2/repository/com/esotericsoftware/kryo/kryo/2.21/kryo-2.21.jar:/Users/sunitisingh/.m2/repository/com/esotericsoftware/reflectasm/reflectasm/1.07/reflectasm-1.07-shaded.jar:/Users/sunitisingh/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/Users/sunitisingh/.m2/repository/org/objenesis/obj
 

Re: Adding hive context gives error

2016-03-07 Thread Mich Talebzadeh
I tend to use SBT to build Spark programs.

This works for me

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
//
object ImportCSV {
  def main(args: Array[String]) {
  val conf = new SparkConf().
   setAppName("ImportCSV").
   setMaster("local[12]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)


HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 8 March 2016 at 00:15, Suniti Singh  wrote:

> Hi All,
>
> I am trying to create a hive context in a scala prog as follows in eclipse:
> Note --  i have added the maven dependency for spark -core , hive , and
> sql.
>
> import org.apache.spark.SparkConf
>
> import org.apache.spark.SparkContext
>
> import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
>
> object DataExp {
>
>def main(args: Array[String]) = {
>
>   val conf = new SparkConf().setAppName("DataExp").setMaster("local")
>
>   val sc = new SparkContext(conf)
>
>  * val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)*
>
>
>  }
>
> }
>
> I get the the following *errors*: @ line of hiveContext above in the prog
>
> 1 --- Error in Scala compiler: bad symbolic reference. A signature in
> HiveContext.class refers to term ui in package
> org.apache.spark.sql.execution which is not available. It may be completely
> missing from the current classpath, or the version on the classpath might
> be incompatible with the version used when compiling HiveContext.class.
> spark Unknown Scala Problem
> 2 --- SBT builder crashed while compiling. The error message is 'bad
> symbolic reference. A signature in HiveContext.class refers to term ui in
> package org.apache.spark.sql.execution which is not available. It may be
> completely missing from the current classpath, or the version on the
> classpath might be incompatible with the version used when compiling
> HiveContext.class.'. Check Error Log for details. spark Unknown Scala
> Problem
>
> 3 --- while compiling:
> /Users/sunitisingh/sparktest/spark/src/main/scala/com/sparktest/spark/DataExp.scala
> during phase: erasure  library version: version 2.10.6
> compiler version: version 2.10.6   reconstructed args: -javabootclasspath
> /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/System/Library/Java/Extensions/AppleScriptEngine.jar:/System/Library/Java/Extensions/dns_sd.jar:/System/Library/Java/Extensions/j3daudio.jar:/System/Library/Java/Extensions/j3dcore.jar:/System/Library/Java/Extensions/j3dutils.jar:/System/Library/Java/Extensions/jai_codec.jar:/System/Library/Java/Extensions/jai_core.jar:/System/Library/Java/Extensions/mlibwrapper_jai.jar:/System/Library/Java/Extensions/MRJToolkit.jar:/System/Library/Java/Extensions/vecmath.jar
> -classpath
> 

Re: OOM exception during Broadcast

2016-03-07 Thread Arash
Hi Ankur,

For this specific test, I'm only running the few lines of code that are
pasted. Nothing else is cached in the cluster.

Thanks,
Arash

On Mon, Mar 7, 2016 at 4:07 PM, Ankur Srivastava  wrote:

> Hi,
>
> We have a use case where we broadcast ~4GB of data and we are on
> m3.2xlarge so your object size is not an issue. Also based on your
> explanation does not look like a broadcast issue as it works when your
> partition size is small.
>
> Are you caching any other data? Because boradcast variable use the cache
> memory.
>
> Thanks
> Ankur
>
> On Mon, Mar 7, 2016 at 3:34 PM, Jeff Zhang  wrote:
>
>> Any reason why do you broadcast such large variable ? It doesn't make
>> sense to me
>>
>> On Tue, Mar 8, 2016 at 7:29 AM, Arash  wrote:
>>
>>> Hello all,
>>>
>>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
>>> but haven't been able to make it work so far.
>>>
>>> It looks like the executors start to run out of memory during
>>> deserialization. This behavior only shows itself when the number of
>>> partitions is above a few 10s, the broadcast does work for 10 or 20
>>> partitions.
>>>
>>> I'm using the following setup to observe the problem:
>>>
>>> val tuples: Array[((String, String), (String, String))]  // ~ 10M
>>> tuples
>>> val tuplesBc = sc.broadcast(tuples)
>>> val numsRdd = sc.parallelize(1 to 5000, 100)
>>> numsRdd.map(n => tuplesBc.value.head).count()
>>>
>>> If I set the number of partitions for numsRDD to 20, the count goes
>>> through successfully, but at 100, I'll start to get errors such as:
>>>
>>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
>>> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
>>> space
>>> at
>>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>>> at
>>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>> at
>>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>
>>>
>>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
>>> property maximizeResourceAllocation is set to true (executor.memory = 48G
>>> according to spark ui environment). We're also using kryo serialization and
>>> Yarn is the resource manager.
>>>
>>> Any ideas as what might be going wrong and 

Adding hive context gives error

2016-03-07 Thread Suniti Singh
Hi All,

I am trying to create a hive context in a scala prog as follows in eclipse:
Note --  i have added the maven dependency for spark -core , hive , and sql.

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object DataExp {

   def main(args: Array[String]) = {

  val conf = new SparkConf().setAppName("DataExp").setMaster("local")

  val sc = new SparkContext(conf)

 * val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)*

 }

}

I get the the following *errors*: @ line of hiveContext above in the prog

1 --- Error in Scala compiler: bad symbolic reference. A signature in
HiveContext.class refers to term ui in package
org.apache.spark.sql.execution which is not available. It may be completely
missing from the current classpath, or the version on the classpath might
be incompatible with the version used when compiling HiveContext.class.
spark Unknown Scala Problem
2 --- SBT builder crashed while compiling. The error message is 'bad
symbolic reference. A signature in HiveContext.class refers to term ui in
package org.apache.spark.sql.execution which is not available. It may be
completely missing from the current classpath, or the version on the
classpath might be incompatible with the version used when compiling
HiveContext.class.'. Check Error Log for details. spark Unknown Scala
Problem

3 --- while compiling:
/Users/sunitisingh/sparktest/spark/src/main/scala/com/sparktest/spark/DataExp.scala
during phase: erasure  library version: version 2.10.6
compiler version: version 2.10.6   reconstructed args: -javabootclasspath
/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/System/Library/Java/Extensions/AppleScriptEngine.jar:/System/Library/Java/Extensions/dns_sd.jar:/System/Library/Java/Extensions/j3daudio.jar:/System/Library/Java/Extensions/j3dcore.jar:/System/Library/Java/Extensions/j3dutils.jar:/System/Library/Java/Extensions/jai_codec.jar:/System/Library/Java/Extensions/jai_core.jar:/System/Library/Java/Extensions/mlibwrapper_jai.jar:/System/Library/Java/Extensions/MRJToolkit.jar:/System/Library/Java/Extensions/vecmath.jar
-classpath

Re: OOM exception during Broadcast

2016-03-07 Thread Ankur Srivastava
Hi,

We have a use case where we broadcast ~4GB of data and we are on m3.2xlarge
so your object size is not an issue. Also based on your explanation does
not look like a broadcast issue as it works when your partition size is
small.

Are you caching any other data? Because boradcast variable use the cache
memory.

Thanks
Ankur

On Mon, Mar 7, 2016 at 3:34 PM, Jeff Zhang  wrote:

> Any reason why do you broadcast such large variable ? It doesn't make
> sense to me
>
> On Tue, Mar 8, 2016 at 7:29 AM, Arash  wrote:
>
>> Hello all,
>>
>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
>> but haven't been able to make it work so far.
>>
>> It looks like the executors start to run out of memory during
>> deserialization. This behavior only shows itself when the number of
>> partitions is above a few 10s, the broadcast does work for 10 or 20
>> partitions.
>>
>> I'm using the following setup to observe the problem:
>>
>> val tuples: Array[((String, String), (String, String))]  // ~ 10M
>> tuples
>> val tuplesBc = sc.broadcast(tuples)
>> val numsRdd = sc.parallelize(1 to 5000, 100)
>> numsRdd.map(n => tuplesBc.value.head).count()
>>
>> If I set the number of partitions for numsRDD to 20, the count goes
>> through successfully, but at 100, I'll start to get errors such as:
>>
>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
>> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
>> space
>> at
>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>> at
>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>
>>
>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
>> property maximizeResourceAllocation is set to true (executor.memory = 48G
>> according to spark ui environment). We're also using kryo serialization and
>> Yarn is the resource manager.
>>
>> Any ideas as what might be going wrong and how to debug this?
>>
>> Thanks,
>> Arash
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Job Jar files located in s3, driver never starts the job

2016-03-07 Thread Scott Reynolds
Following the documentation on spark-submit,
http://spark.apache.org/docs/latest/submitting-applications.html#launching-applications-with-spark-submit


   - application-jar: Path to a bundled jar including your application and
   all dependencies. The URL must be globally visible inside of your cluster,
   for instance, an hdfs:// path or a file:// path that is present on all
   nodes.


I submitted a job with the application-jar specified as
s3a://path/to/jar/file/in/s3.jar and the driver didn't do anything. No logs
and no cores / memory taken. I had plenty of both.

I was able to add the hadoop-aws and aws-sdk to the master and the worker's
class paths so they are running with the libraries.

Can someone help me understand how a driver is run on a spark worker? Can
someone help me understand how to get the proper hadoop libraries onto the
path of the driver so that it is able to download and execute a jar file in
s3 ?


Re: OOM exception during Broadcast

2016-03-07 Thread Arash
Well, I'm trying to avoid a big shuffle/join, from what I could find
online, my understanding was that 1G broadcast should be doable, is that
not accurate?

On Mon, Mar 7, 2016 at 3:34 PM, Jeff Zhang  wrote:

> Any reason why do you broadcast such large variable ? It doesn't make
> sense to me
>
> On Tue, Mar 8, 2016 at 7:29 AM, Arash  wrote:
>
>> Hello all,
>>
>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
>> but haven't been able to make it work so far.
>>
>> It looks like the executors start to run out of memory during
>> deserialization. This behavior only shows itself when the number of
>> partitions is above a few 10s, the broadcast does work for 10 or 20
>> partitions.
>>
>> I'm using the following setup to observe the problem:
>>
>> val tuples: Array[((String, String), (String, String))]  // ~ 10M
>> tuples
>> val tuplesBc = sc.broadcast(tuples)
>> val numsRdd = sc.parallelize(1 to 5000, 100)
>> numsRdd.map(n => tuplesBc.value.head).count()
>>
>> If I set the number of partitions for numsRDD to 20, the count goes
>> through successfully, but at 100, I'll start to get errors such as:
>>
>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
>> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
>> space
>> at
>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>> at
>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>
>>
>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
>> property maximizeResourceAllocation is set to true (executor.memory = 48G
>> according to spark ui environment). We're also using kryo serialization and
>> Yarn is the resource manager.
>>
>> Any ideas as what might be going wrong and how to debug this?
>>
>> Thanks,
>> Arash
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Saving Spark generated table into underlying Hive table using Functional programming

2016-03-07 Thread Mich Talebzadeh
Ok I solved the problem.

When one uses spark-shell it starts with HiveContext so things work. The
caveat is that any Spark temp table created
with "registerTempTable("TABLE") has to be queried by sqlContext.sql
otherwise that table is NOT visible to HiveContext.sql.

To make this work with projects built with SBT, sqlContext has to be
created from HiveContext  and NOT other way round as shown below:

  def main(args: Array[String]) {
  val conf = new SparkConf().
   setAppName("ImportCSV").
   setMaster("local[12]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  *val sqlContext = new HiveContext(sc)*

Data in temp table can be queried and saved as follows

val results = sqlContext.sql("SELECT * FROM tmp")  // tmp is spark
temporary table here
val output = "hdfs://rhes564:9000/user/hive/warehouse/test.db/t3"   // The
path for Hive table that must exist
results.write.format("orc").save(output)

HTH




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 7 March 2016 at 21:27, Mich Talebzadeh  wrote:

> This is the code
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.functions._
> //
> object ImportCSV {
>   def main(args: Array[String]) {
>   val conf = new SparkConf().
>setAppName("ImportCSV").
>setMaster("local[12]").
>set("spark.driver.allowMultipleContexts", "true").
>set("spark.hadoop.validateOutputSpecs", "false")
>   val sc = new SparkContext(conf)
>   val sqlContext= new org.apache.spark.sql.SQLContext(sc)
>   import sqlContext.implicits._
>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> val df =
> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
> val a = df.filter(col("Total") > "").map(x =>
> (x.getString(0),x.getString(1), x.getString(2).substring(1).replace(",",
> "").toDouble, x.getString(3).substring(1).replace(",", "").toDouble,
> x.getString(4).substring(1).replace(",", "").toDouble))
> a.toDF.registerTempTable("tmp")
> HiveContext.sql("use test")
> HiveContext.sql("DROP TABLE IF EXISTS t3")
> var sqltext : String = ""
> sqltext = """
> CREATE TABLE t3 (
>  INVOICENUMBER  INT
> ,PAYMENTDATEtimestamp
> ,NETDECIMAL(20,2)
> ,VATDECIMAL(20,2)
> ,TOTAL  DECIMAL(20,2)
> )
> COMMENT 'from csv file from excel sheet'
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB" )
> """
> HiveContext.sql(sqltext)
> HiveContext.sql("INSERT INTO TABLE t3 SELECT * FROM tmp")
> HiveContext.sql("SELECT * FROM t3 ORDER BY 1").collect.foreach(println)
> }
> }
>
> I am getting this error when running the above code with spark-submit
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: no such
> table tmp; line 1 pos 35
> at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:260)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268)
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 7 March 2016 at 21:00, Holden Karau  wrote:
>
>> So what about if you just start with a hive context, and create your DF
>> using the HiveContext?
>>
>>
>> On Monday, March 7, 2016, Mich Talebzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> I have done this Spark-shell and Hive itself so it works.
>>>
>>> I am exploring whether I can do it programmatically. The problem I
>>> encounter was that I tried to register the DF as temporary table. The
>>> problem is that trying to insert from temporary table into Hive table, II
>>> was getting the following error
>>>
>>> sqltext = "INSERT INTO TABLE t3 SELECT * FROM tmp"
>>>
>>> sqlContext.sql(sqltext)
>>>
>>> Tables created with SQLContext must be TEMPORARY. Use a HiveContext
>>> instead.
>>>
>>> When I switched to HiveContext, it could not see the temporary table
>>>
>>> Do decided to save the Spark table as follows:
>>>
>>> val a = 

Re: OOM exception during Broadcast

2016-03-07 Thread Jeff Zhang
Any reason why do you broadcast such large variable ? It doesn't make sense
to me

On Tue, Mar 8, 2016 at 7:29 AM, Arash  wrote:

> Hello all,
>
> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
> but haven't been able to make it work so far.
>
> It looks like the executors start to run out of memory during
> deserialization. This behavior only shows itself when the number of
> partitions is above a few 10s, the broadcast does work for 10 or 20
> partitions.
>
> I'm using the following setup to observe the problem:
>
> val tuples: Array[((String, String), (String, String))]  // ~ 10M
> tuples
> val tuplesBc = sc.broadcast(tuples)
> val numsRdd = sc.parallelize(1 to 5000, 100)
> numsRdd.map(n => tuplesBc.value.head).count()
>
> If I set the number of partitions for numsRDD to 20, the count goes
> through successfully, but at 100, I'll start to get errors such as:
>
> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
> space
> at
> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
> at
> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>
>
> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
> property maximizeResourceAllocation is set to true (executor.memory = 48G
> according to spark ui environment). We're also using kryo serialization and
> Yarn is the resource manager.
>
> Any ideas as what might be going wrong and how to debug this?
>
> Thanks,
> Arash
>
>


-- 
Best Regards

Jeff Zhang


OOM exception during Broadcast

2016-03-07 Thread Arash
Hello all,

I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes but
haven't been able to make it work so far.

It looks like the executors start to run out of memory during
deserialization. This behavior only shows itself when the number of
partitions is above a few 10s, the broadcast does work for 10 or 20
partitions.

I'm using the following setup to observe the problem:

val tuples: Array[((String, String), (String, String))]  // ~ 10M tuples
val tuplesBc = sc.broadcast(tuples)
val numsRdd = sc.parallelize(1 to 5000, 100)
numsRdd.map(n => tuplesBc.value.head).count()

If I set the number of partitions for numsRDD to 20, the count goes through
successfully, but at 100, I'll start to get errors such as:

16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
space
at
java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
at
java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)


I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
property maximizeResourceAllocation is set to true (executor.memory = 48G
according to spark ui environment). We're also using kryo serialization and
Yarn is the resource manager.

Any ideas as what might be going wrong and how to debug this?

Thanks,
Arash


Re: Spark ML - Scaling logistic regression for many features

2016-03-07 Thread Michał Zieliński
We're using SparseVector columns in a DataFrame, so they are definitely
supported. But maybe for LR some implicit magic is happening inside.

On 7 March 2016 at 23:04, Devin Jones  wrote:

> I could be wrong but its possible that toDF populates a dataframe which I
> understand do not support sparsevectors at the moment.
>
> If you use the MlLib logistic regression implementation (not ml) you can
> pass the RDD[LabeledPoint] data type directly to the learner.
>
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
>
> Only downside is that you can't use the pipeline framework from spark ml.
>
> Cheers,
> Devin
>
>
>
> On Mon, Mar 7, 2016 at 4:54 PM, Daniel Siegmann <
> daniel.siegm...@teamaol.com> wrote:
>
>> Yes, it is a SparseVector. Most rows only have a few features, and all
>> the rows together only have tens of thousands of features, but the vector
>> size is ~ 20 million because that is the largest feature.
>>
>> On Mon, Mar 7, 2016 at 4:31 PM, Devin Jones 
>> wrote:
>>
>>> Hi,
>>>
>>> Which data structure are you using to train the model? If you haven't
>>> tried yet, you should consider the SparseVector
>>>
>>>
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.SparseVector
>>>
>>>
>>> On Mon, Mar 7, 2016 at 4:03 PM, Daniel Siegmann <
>>> daniel.siegm...@teamaol.com> wrote:
>>>
 I recently tried to a model using
 org.apache.spark.ml.classification.LogisticRegression on a data set
 where the feature vector size was around ~20 million. It did *not* go
 well. It took around 10 hours to train on a substantial cluster.
 Additionally, it pulled a lot data back to the driver - I eventually set 
 --conf
 spark.driver.memory=128g --conf spark.driver.maxResultSize=112g when
 submitting.

 Attempting the same application on the same cluster with the feature
 vector size reduced to 100k took only ~ 9 minutes. Clearly there is an
 issue with scaling to large numbers of features. I'm not doing anything
 fancy in my app, here's the relevant code:

 val lr = new LogisticRegression().setRegParam(1)
 val model = lr.fit(trainingSet.toDF())

 In comparison, a coworker trained a logistic regression model on her
 *laptop* using the Java library liblinear in just a few minutes.
 That's with the ~20 million-sized feature vectors. This suggests to me
 there is some issue with Spark ML's implementation of logistic regression
 which is limiting its scalability.

 Note that my feature vectors are *very* sparse. The maximum feature is
 around 20 million, but I think there are only 10's of thousands of 
 features.

 Has anyone run into this? Any idea where the bottleneck is or how this
 problem might be solved?

 One solution of course is to implement some dimensionality reduction.
 I'd really like to avoid this, as it's just another thing to deal with -
 not so hard to put it into the trainer, but then anything doing scoring
 will need the same logic. Unless Spark ML supports this out of the box? An
 easy way to save / load a model along with the dimensionality reduction
 logic so when transform is called on the model it will handle the
 dimensionality reduction transparently?

 Any advice would be appreciated.

 ~Daniel Siegmann

>>>
>>>
>>
>


streaming will I loose data if spark.streaming.backpressure.enabled=true

2016-03-07 Thread Andy Davidson

http://spark.apache.org/docs/latest/streaming-programming-guide.html#deployi
ng-applications

Gives a brief discussion about max rate and back pressure

Its not clear to me what will happen. I use an unreliable reciever. Let say
me app is running and process time is less then window length. Happy happy

After a while there is a burst of data and processing time now exceeds
windows length What happens? Am I going to loose data.(I have implemented
checkpoints)

Okay time goes by data rate returns to normal processing time is < window
length again. Does the system catch up? Or is data lost?

Kind regards

Andy





Re: streaming app performance when would increasing execution size or adding more cores

2016-03-07 Thread Igor Berman
may be you are experiencing problem with FileOutputCommiter vs
DirectCommiter while working with s3? do you have hdfs so you can try it to
verify?

committing in s3 will copy 1-by-1 all partitions to your final destination
bucket from _temporary, so this stage might become a bottleneck(so reducing
number of partitions might "solve" it)
there is a thread in this mailing list regarding this problem few weeks ago



On 7 March 2016 at 23:53, Andy Davidson 
wrote:

> We just deployed our first streaming apps. The next step is running them
> so they run reliably
>
> We have spend a lot of time reading the various prog guides looking at the
> standalone cluster manager app performance web pages.
>
> Looking at the streaming tab and the stages tab have been the most helpful
> in tuning our app. However we do not understand the connection between
> memory  and # cores will effect throughput and performance. Usually
> adding memory is the cheapest way to improve performance.
>
> When we have a single receiver call spark-submit --total-executor-cores
> 2. Changing the value does not seem to change throughput. our bottle neck
> was s3 write time, saveAsTextFile(). Reducing the number of partitions
> dramatically reduces s3 write times.
>
> Adding memory also does not improve performance
>
> *I would think that adding more cores would allow more concurrent tasks
> run. That is to say reducing num partions would slow things down*
>
> What are best practices?
>
> Kind regards
>
> Andy
>
>
>
>
>
>
>


how to implement and deploy robust streaming apps

2016-03-07 Thread Andy Davidson
One of the challenges we need to prepare for with streaming apps is bursty
data. Typically we need to estimate our worst case data load and make sure
we have enough capacity


It not obvious what best practices are with spark streaming.

* we have implemented check pointing as described in the prog guide
* Use stand alone cluster manager and spark-submit
* We use the mgmt console to kill drives when needed
* we plan to configure write ahead spark.streaming.backpressure.enabled to
true.
* our application runs a single unreliable receive
> * We run multiple implementation configured to partition the input

As long as our processing time is < our windowing time everything is fine

In the streaming systems I have worked on in the past we scaled out by using
load balancers and proxy farms to create buffering capacity. Its not clear
how to scale out spark

In our limited testing it seems like we have a single app configure to
receive a predefined portion of the data. Once it is stated we can not add
additional resources. Adding cores and memory does not seem increase our
capacity 


Kind regards

Andy






Re: Spark ML - Scaling logistic regression for many features

2016-03-07 Thread Devin Jones
I could be wrong but its possible that toDF populates a dataframe which I
understand do not support sparsevectors at the moment.

If you use the MlLib logistic regression implementation (not ml) you can
pass the RDD[LabeledPoint] data type directly to the learner.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS

Only downside is that you can't use the pipeline framework from spark ml.

Cheers,
Devin



On Mon, Mar 7, 2016 at 4:54 PM, Daniel Siegmann  wrote:

> Yes, it is a SparseVector. Most rows only have a few features, and all
> the rows together only have tens of thousands of features, but the vector
> size is ~ 20 million because that is the largest feature.
>
> On Mon, Mar 7, 2016 at 4:31 PM, Devin Jones 
> wrote:
>
>> Hi,
>>
>> Which data structure are you using to train the model? If you haven't
>> tried yet, you should consider the SparseVector
>>
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.SparseVector
>>
>>
>> On Mon, Mar 7, 2016 at 4:03 PM, Daniel Siegmann <
>> daniel.siegm...@teamaol.com> wrote:
>>
>>> I recently tried to a model using
>>> org.apache.spark.ml.classification.LogisticRegression on a data set
>>> where the feature vector size was around ~20 million. It did *not* go
>>> well. It took around 10 hours to train on a substantial cluster.
>>> Additionally, it pulled a lot data back to the driver - I eventually set 
>>> --conf
>>> spark.driver.memory=128g --conf spark.driver.maxResultSize=112g when
>>> submitting.
>>>
>>> Attempting the same application on the same cluster with the feature
>>> vector size reduced to 100k took only ~ 9 minutes. Clearly there is an
>>> issue with scaling to large numbers of features. I'm not doing anything
>>> fancy in my app, here's the relevant code:
>>>
>>> val lr = new LogisticRegression().setRegParam(1)
>>> val model = lr.fit(trainingSet.toDF())
>>>
>>> In comparison, a coworker trained a logistic regression model on her
>>> *laptop* using the Java library liblinear in just a few minutes. That's
>>> with the ~20 million-sized feature vectors. This suggests to me there is
>>> some issue with Spark ML's implementation of logistic regression which is
>>> limiting its scalability.
>>>
>>> Note that my feature vectors are *very* sparse. The maximum feature is
>>> around 20 million, but I think there are only 10's of thousands of features.
>>>
>>> Has anyone run into this? Any idea where the bottleneck is or how this
>>> problem might be solved?
>>>
>>> One solution of course is to implement some dimensionality reduction.
>>> I'd really like to avoid this, as it's just another thing to deal with -
>>> not so hard to put it into the trainer, but then anything doing scoring
>>> will need the same logic. Unless Spark ML supports this out of the box? An
>>> easy way to save / load a model along with the dimensionality reduction
>>> logic so when transform is called on the model it will handle the
>>> dimensionality reduction transparently?
>>>
>>> Any advice would be appreciated.
>>>
>>> ~Daniel Siegmann
>>>
>>
>>
>


streaming app performance when would increasing execution size or adding more cores

2016-03-07 Thread Andy Davidson
We just deployed our first streaming apps. The next step is running them so
they run reliably

We have spend a lot of time reading the various prog guides looking at the
standalone cluster manager app performance web pages.

Looking at the streaming tab and the stages tab have been the most helpful
in tuning our app. However we do not understand the connection between
memory  and # cores will effect throughput and performance. Usually adding
memory is the cheapest way to improve performance.

When we have a single receiver call spark-submit --total-executor-cores 2.
Changing the value does not seem to change throughput. our bottle neck was
s3 write time, saveAsTextFile(). Reducing the number of partitions
dramatically reduces s3 write times.

Adding memory also does not improve performance

I would think that adding more cores would allow more concurrent tasks run.
That is to say reducing num partions would slow things down

What are best practices?

Kind regards

Andy










Re: Spark ML - Scaling logistic regression for many features

2016-03-07 Thread Devin Jones
Hi,

Which data structure are you using to train the model? If you haven't tried
yet, you should consider the SparseVector

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.SparseVector


On Mon, Mar 7, 2016 at 4:03 PM, Daniel Siegmann  wrote:

> I recently tried to a model using
> org.apache.spark.ml.classification.LogisticRegression on a data set where
> the feature vector size was around ~20 million. It did *not* go well. It
> took around 10 hours to train on a substantial cluster. Additionally, it
> pulled a lot data back to the driver - I eventually set --conf
> spark.driver.memory=128g --conf spark.driver.maxResultSize=112g when
> submitting.
>
> Attempting the same application on the same cluster with the feature
> vector size reduced to 100k took only ~ 9 minutes. Clearly there is an
> issue with scaling to large numbers of features. I'm not doing anything
> fancy in my app, here's the relevant code:
>
> val lr = new LogisticRegression().setRegParam(1)
> val model = lr.fit(trainingSet.toDF())
>
> In comparison, a coworker trained a logistic regression model on her
> *laptop* using the Java library liblinear in just a few minutes. That's
> with the ~20 million-sized feature vectors. This suggests to me there is
> some issue with Spark ML's implementation of logistic regression which is
> limiting its scalability.
>
> Note that my feature vectors are *very* sparse. The maximum feature is
> around 20 million, but I think there are only 10's of thousands of features.
>
> Has anyone run into this? Any idea where the bottleneck is or how this
> problem might be solved?
>
> One solution of course is to implement some dimensionality reduction. I'd
> really like to avoid this, as it's just another thing to deal with - not so
> hard to put it into the trainer, but then anything doing scoring will need
> the same logic. Unless Spark ML supports this out of the box? An easy way
> to save / load a model along with the dimensionality reduction logic so
> when transform is called on the model it will handle the dimensionality
> reduction transparently?
>
> Any advice would be appreciated.
>
> ~Daniel Siegmann
>


Re: Saving Spark generated table into underlying Hive table using Functional programming

2016-03-07 Thread Mich Talebzadeh
This is the code

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
//
object ImportCSV {
  def main(args: Array[String]) {
  val conf = new SparkConf().
   setAppName("ImportCSV").
   setMaster("local[12]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  val sqlContext= new org.apache.spark.sql.SQLContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val df =
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")
val a = df.filter(col("Total") > "").map(x =>
(x.getString(0),x.getString(1), x.getString(2).substring(1).replace(",",
"").toDouble, x.getString(3).substring(1).replace(",", "").toDouble,
x.getString(4).substring(1).replace(",", "").toDouble))
a.toDF.registerTempTable("tmp")
HiveContext.sql("use test")
HiveContext.sql("DROP TABLE IF EXISTS t3")
var sqltext : String = ""
sqltext = """
CREATE TABLE t3 (
 INVOICENUMBER  INT
,PAYMENTDATEtimestamp
,NETDECIMAL(20,2)
,VATDECIMAL(20,2)
,TOTAL  DECIMAL(20,2)
)
COMMENT 'from csv file from excel sheet'
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB" )
"""
HiveContext.sql(sqltext)
HiveContext.sql("INSERT INTO TABLE t3 SELECT * FROM tmp")
HiveContext.sql("SELECT * FROM t3 ORDER BY 1").collect.foreach(println)
}
}

I am getting this error when running the above code with spark-submit

Exception in thread "main" org.apache.spark.sql.AnalysisException: no such
table tmp; line 1 pos 35
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:260)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:268)


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 7 March 2016 at 21:00, Holden Karau  wrote:

> So what about if you just start with a hive context, and create your DF
> using the HiveContext?
>
>
> On Monday, March 7, 2016, Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> I have done this Spark-shell and Hive itself so it works.
>>
>> I am exploring whether I can do it programmatically. The problem I
>> encounter was that I tried to register the DF as temporary table. The
>> problem is that trying to insert from temporary table into Hive table, II
>> was getting the following error
>>
>> sqltext = "INSERT INTO TABLE t3 SELECT * FROM tmp"
>>
>> sqlContext.sql(sqltext)
>>
>> Tables created with SQLContext must be TEMPORARY. Use a HiveContext
>> instead.
>>
>> When I switched to HiveContext, it could not see the temporary table
>>
>> Do decided to save the Spark table as follows:
>>
>> val a = df.filter(col("Total") > "").map(x =>
>> (x.getString(0),x.getString(1), x.getString(2).substring(1).replace(",",
>> "").toDouble, x.getString(3).substring(1).replace(",", "").toDouble,
>> x.getString(4).substring(1).replace(",", "").toDouble))
>>
>> --delete the file in hdfs if already exists
>> val hadoopConf = new org.apache.hadoop.conf.Configuration()
>> val hdfs = org.apache.hadoop.fs.FileSystem.get(new
>> java.net.URI("hdfs://rhes564:9000"), hadoopConf)
>> val output = "hdfs://rhes564:9000/user/hduser/t3_parquet"
>> try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch {
>> case _ : Throwable => { } }
>>
>> -- save it as Parquet file
>> a.toDF.saveAsParquetFile(output)
>>
>> -- Hive table t3 is created as a simple textfile. ORC did not work!
>>
>> HiveContext.sql("LOAD DATA INPATH '/user/hduser/t3_parquet' into table
>> t3")
>>
>> OK that works but very cumbersome.
>>
>> I checked the web but there are conflicting attempts to solve this issue.
>>
>> Please note that this can be done easily with spark-shell as it is built
>> in HiveContext.
>>
>> Thanks
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


Spark ML - Scaling logistic regression for many features

2016-03-07 Thread Daniel Siegmann
I recently tried to a model using
org.apache.spark.ml.classification.LogisticRegression on a data set where
the feature vector size was around ~20 million. It did *not* go well. It
took around 10 hours to train on a substantial cluster. Additionally, it
pulled a lot data back to the driver - I eventually set --conf
spark.driver.memory=128g --conf spark.driver.maxResultSize=112g when
submitting.

Attempting the same application on the same cluster with the feature vector
size reduced to 100k took only ~ 9 minutes. Clearly there is an issue with
scaling to large numbers of features. I'm not doing anything fancy in my
app, here's the relevant code:

val lr = new LogisticRegression().setRegParam(1)
val model = lr.fit(trainingSet.toDF())

In comparison, a coworker trained a logistic regression model on her
*laptop* using the Java library liblinear in just a few minutes. That's
with the ~20 million-sized feature vectors. This suggests to me there is
some issue with Spark ML's implementation of logistic regression which is
limiting its scalability.

Note that my feature vectors are *very* sparse. The maximum feature is
around 20 million, but I think there are only 10's of thousands of features.

Has anyone run into this? Any idea where the bottleneck is or how this
problem might be solved?

One solution of course is to implement some dimensionality reduction. I'd
really like to avoid this, as it's just another thing to deal with - not so
hard to put it into the trainer, but then anything doing scoring will need
the same logic. Unless Spark ML supports this out of the box? An easy way
to save / load a model along with the dimensionality reduction logic so
when transform is called on the model it will handle the dimensionality
reduction transparently?

Any advice would be appreciated.

~Daniel Siegmann


Re: Saving Spark generated table into underlying Hive table using Functional programming

2016-03-07 Thread Holden Karau
So what about if you just start with a hive context, and create your DF
using the HiveContext?

On Monday, March 7, 2016, Mich Talebzadeh  wrote:

> Hi,
>
> I have done this Spark-shell and Hive itself so it works.
>
> I am exploring whether I can do it programmatically. The problem I
> encounter was that I tried to register the DF as temporary table. The
> problem is that trying to insert from temporary table into Hive table, II
> was getting the following error
>
> sqltext = "INSERT INTO TABLE t3 SELECT * FROM tmp"
>
> sqlContext.sql(sqltext)
>
> Tables created with SQLContext must be TEMPORARY. Use a HiveContext
> instead.
>
> When I switched to HiveContext, it could not see the temporary table
>
> Do decided to save the Spark table as follows:
>
> val a = df.filter(col("Total") > "").map(x =>
> (x.getString(0),x.getString(1), x.getString(2).substring(1).replace(",",
> "").toDouble, x.getString(3).substring(1).replace(",", "").toDouble,
> x.getString(4).substring(1).replace(",", "").toDouble))
>
> --delete the file in hdfs if already exists
> val hadoopConf = new org.apache.hadoop.conf.Configuration()
> val hdfs = org.apache.hadoop.fs.FileSystem.get(new
> java.net.URI("hdfs://rhes564:9000"), hadoopConf)
> val output = "hdfs://rhes564:9000/user/hduser/t3_parquet"
> try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch {
> case _ : Throwable => { } }
>
> -- save it as Parquet file
> a.toDF.saveAsParquetFile(output)
>
> -- Hive table t3 is created as a simple textfile. ORC did not work!
>
> HiveContext.sql("LOAD DATA INPATH '/user/hduser/t3_parquet' into table t3")
>
> OK that works but very cumbersome.
>
> I checked the web but there are conflicting attempts to solve this issue.
>
> Please note that this can be done easily with spark-shell as it is built
> in HiveContext.
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Updating reference data once a day in Spark Streaming job

2016-03-07 Thread Karthikeyan Muthukumar
Hi,
We have reference data pulled in from an RDBMS through a Sqoop job, this
reference data is pulled into the Analytics platform once a day.
We have a Spark Streaming job, where at job bootup we read the reference
data, and then join this reference data with continuously flowing event
data. When the reference data gets updated once a day, how do I make sure
the Spark Streaming job uses the newly updated reference data?
One simple way is to bounce the Spark Streaming job once a day after new
Reference data is imported but is there a better & less-destructive
approach?
Thanks & Regards
MK


Re: Is Spark right for us?

2016-03-07 Thread Laumegui Deaulobi
Thanks for your input.  That 1 hour per data point actually be a problem,
since sometimes we have reports with 100s of data points and need to
generate 100,000 reports.  So we definitely need to distribute this, but I
don't know where to start with this unfortunately.

On Mon, Mar 7, 2016 at 2:42 PM, Anurag [via Apache Spark User List] <
ml-node+s1001560n26421...@n3.nabble.com> wrote:

> Definition - each answer by an user is an event (I suppose)
>
> Let's estimate the number of events that can happen in a day in your case.
>
> 1. Max of Survey fill-outs / user = 10 = x
> 2. Max of Questions per survey = 100 = y
> 3. Max of users = 100,000 = z
>
>
> Maximum answers received in a day = x * y * z = 100,000,000 = 100 million
>
> Assuming you use a single c3.2xlarge machine,
> each data point in the report will get calculated in less than 1 hour
> (telling from my personal experience)
>
> I guess that would help.
>
> Regards
> Anurag
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412p26421.html
> To unsubscribe from Is Spark right for us?, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412p26422.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Saving Spark generated table into underlying Hive table using Functional programming

2016-03-07 Thread Mich Talebzadeh
Hi,

I have done this Spark-shell and Hive itself so it works.

I am exploring whether I can do it programmatically. The problem I
encounter was that I tried to register the DF as temporary table. The
problem is that trying to insert from temporary table into Hive table, II
was getting the following error

sqltext = "INSERT INTO TABLE t3 SELECT * FROM tmp"

sqlContext.sql(sqltext)

Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.

When I switched to HiveContext, it could not see the temporary table

Do decided to save the Spark table as follows:

val a = df.filter(col("Total") > "").map(x =>
(x.getString(0),x.getString(1), x.getString(2).substring(1).replace(",",
"").toDouble, x.getString(3).substring(1).replace(",", "").toDouble,
x.getString(4).substring(1).replace(",", "").toDouble))

--delete the file in hdfs if already exists
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new
java.net.URI("hdfs://rhes564:9000"), hadoopConf)
val output = "hdfs://rhes564:9000/user/hduser/t3_parquet"
try { hdfs.delete(new org.apache.hadoop.fs.Path(output), true) } catch {
case _ : Throwable => { } }

-- save it as Parquet file
a.toDF.saveAsParquetFile(output)

-- Hive table t3 is created as a simple textfile. ORC did not work!

HiveContext.sql("LOAD DATA INPATH '/user/hduser/t3_parquet' into table t3")

OK that works but very cumbersome.

I checked the web but there are conflicting attempts to solve this issue.

Please note that this can be done easily with spark-shell as it is built in
HiveContext.

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: Is Spark right for us?

2016-03-07 Thread Mich Talebzadeh
Hi,

Have you looked at SSAS and cubes. You 100GB is nothing, Even Micky Mouse
SQL Server should handle that :)

Remember also that you are moving from Windows Platform to Linux which may
involve additional training as well.

Other attractive option (well I do not know the nature of your queries) is
to use something like SAP Sybase IQ which is columnar and provides full
support for your Transact SQL queries with columnar compression. You can
BCP data out of your MSSQL database and load it in Sybase IQ probably in
1-2 days. Your DDL from MSSQL will work fine on Sybase IQ plus your procs.
If you are working in Financial sector like I do then you may already come
across Sybase IQ for analytics,

At this juncture with 100GB database (which is pretty small these days) I
would first consider SSAS (which I assume you have a license for it) or
something like IQ.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 7 March 2016 at 16:13, Guillaume Bilodeau 
wrote:

> Hi everyone,
>
> First thanks for taking some time on your Sunday to reply.  Some points in
> no particular order:
>
> . The feedback from everyone tells me that I have a lot of reading to do
> first.  Thanks for all the pointers.
> . The data is currently stored in a row-oriented database (SQL Server 2012
> to be precise), but as I said we're open to moving data to a different kind
> of data store (column-oriented, document, etc.)
> . I don't have precise numbers for the size of the database, but I would
> guess the larger ones have around 100 GB of data.  To us, this is huge;
> obviously, for companies such as Google, it's a second's worth of data.
> . For this particular issue, we're talking about ordinal data, not free
> text fields.
> . I agree that Spark is tooling, but I also see it as an implementation of
> a specific design, namely distributed computing on a distributed data
> store, if I understand correctly.
> . For sure, I would like to avoid introducing a new technology to the mix,
> so reusing the current infrastructure in a more optimal way would be our
> first choice.
> . Our main issue is that we'd like to be able to scale by distributing
> instead of adding more memory to this single database.  The current
> computations are done using SQL queries.  The data set does not fit in
> memory.  So yes, we could distribute query construction and result
> aggregation, but the database would still be the bottleneck.  That's why
> I'm wondering if we should investigate technologies such as Spark or
> Hadoop, but maybe I'm completely mistaken and we can leverage our current
> infrastructure.
>
> Thanks,
> GB
>
>
> On Mon, Mar 7, 2016 at 3:05 AM, Jörn Franke  wrote:
>
>> I think the Relational Database will be faster for ordinal data (eg where
>> you answer from 1..x). For free text fields I would recommend solr or
>> elastic search, because they have a lot more text analytics capabilities
>> that do not exist in a relational database or MongoDB and are not likely to
>> be there in the near future.
>>
>> On 06 Mar 2016, at 18:25, Guillaume Bilodeau <
>> guillaume.bilod...@gmail.com> wrote:
>>
>> The data is currently stored in a relational database, but a migration to
>> a document-oriented database such as MongoDb is something we are definitely
>> considering.  How does this factor in?
>>
>> On Sun, Mar 6, 2016 at 12:23 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> That depends on a lot of things, but as a starting point I would ask
>>> whether you are planning to store your data in JSON format?
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Sun, Mar 6, 2016 at 5:17 PM, Laumegui Deaulobi <
>>> guillaume.bilod...@gmail.com> wrote:
>>>
 Our problem space is survey analytics.  Each survey comprises a set of
 questions, with each question having a set of possible answers.  Survey
 fill-out tasks are sent to users, who have until a certain date to
 complete
 it.  Based on these survey fill-outs, reports need to be generated.
 Each
 report deals with a subset of the survey fill-outs, and comprises a set
 of
 data points (average rating for question 1, min/max for question 2,
 etc.)

 We are dealing with rather large data sets - although reading the
 internet
 we get the impression that everyone is analyzing petabytes of data...

 Users: up to 100,000
 Surveys: up to 100,000
 Questions per survey: up to 100
 Possible answers per question: up to 10
 Survey fill-outs / user: up to 10
 Reports: up to 100,000
 Data points per report: up to 100

 Data is currently stored in a relational database but a migration to a
 different kind of store is possible.

 The naive 

Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-07 Thread James Hammerton
Hi Ted,

Thanks for getting back - I realised my mistake... I was clicking the
little drop down menu on the right hand side of the Create button (it looks
as if it's part of the button) - when I clicked directly on the word
"Create" I got a form that made more sense and allowed me to choose the
project.

Regards,

James


On 7 March 2016 at 13:09, Ted Yu  wrote:

> Have you tried clicking on Create button from an existing Spark JIRA ?
> e.g.
> https://issues.apache.org/jira/browse/SPARK-4352
>
> Once you're logged in, you should be able to select Spark as the Project.
>
> Cheers
>
> On Mon, Mar 7, 2016 at 2:54 AM, James Hammerton  wrote:
>
>> Hi,
>>
>> So I managed to isolate the bug and I'm ready to try raising a JIRA
>> issue. I joined the Apache Jira project so I can create tickets.
>>
>> However when I click Create from the Spark project home page on JIRA, it
>> asks me to click on one of the following service desks: Kylin, Atlas,
>> Ranger, Apache Infrastructure. There doesn't seem to be an option for me to
>> raise an issue for Spark?!
>>
>> Regards,
>>
>> James
>>
>>
>> On 4 March 2016 at 14:03, James Hammerton  wrote:
>>
>>> Sure thing, I'll see if I can isolate this.
>>>
>>> Regards.
>>>
>>> James
>>>
>>> On 4 March 2016 at 12:24, Ted Yu  wrote:
>>>
 If you can reproduce the following with a unit test, I suggest you open
 a JIRA.

 Thanks

 On Mar 4, 2016, at 4:01 AM, James Hammerton  wrote:

 Hi,

 I've come across some strange behaviour with Spark 1.6.0.

 In the code below, the filtering by "eventName" only seems to work if I
 called .cache on the resulting DataFrame.

 If I don't do this, the code crashes inside the UDF because it
 processes an event that the filter should get rid off.

 Any ideas why this might be the case?

 The code is as follows:

>   val df = sqlContext.read.parquet(inputPath)
>   val filtered = df.filter(df("eventName").equalTo(Created))
>   val extracted = extractEmailReferences(sqlContext,
> filtered.cache) // Caching seems to be required for the filter to work
>   extracted.write.parquet(outputPath)


 where extractEmailReferences does this:

>

 def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
> DataFrame = {

 val extracted = df.select(df(EventFieldNames.ObjectId),

   extractReferencesUDF(df(EventFieldNames.EventJson),
> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")


> extracted.filter(extracted("references").notEqual("UNKNOWN"))

   }


 and extractReferencesUDF:

> def extractReferencesUDF = udf(extractReferences(_: String, _: String,
> _: String))

 def extractReferences(eventJson: String, objectId: String, userId:
> String): String = {
> import org.json4s.jackson.Serialization
> import org.json4s.NoTypeHints
> implicit val formats = Serialization.formats(NoTypeHints)
>
> val created = Serialization.read[GMailMessage.Created](eventJson)
> // This is where the code crashes if the .cache isn't called


  Regards,

 James


>>>
>>
>


Setting PYSPARK_PYTHON in spark-env.sh vs from driver program

2016-03-07 Thread Kostas Chalikias
All - would appreciate some insight regarding how to set PYSPARK_PYTHON 
correctly.


I have created a virtual environment in the same place for all 3 of my 
cluster hosts, 2 of them running slaves and one running a master. I also 
run an RPC server on the master host to allow users from the office (the 
cluster is hosted elsewhere) to send the work.


For the master and slaves, I created $SPARK_HOME/conf/spark-env.sh and 
set PYSPARK_PYTHON to the executable of my virtualenv. I made 
spark-env.sh executable by all as suggested by the docs even though it 
appears that it is sourced to be safe.


I then started the cluster using start-master.sh and start-slave.sh 
accordingly and inspected the environment variables of each process 
under /proc/pid to confirm PYSPARK_PYTHON was set correctly, which it 
was. I then sent the first bunch of work only to get exceptions logged 
in the driver program (the RPC server) from the slaves being unable to 
import my modules upon unpickling data.


After several hours of reading docs and pulling out hair, I tried 
setting PYSPARK_PYTHON into the environment in the code of the RPC 
server / driver program as follows, based on this mailing list query:


https://mail-archives.apache.org/mod_mbox/spark-user/201403.mbox/%3CCAG-p0g2L=z9H1H4ZY1XdLOGnGyPEKqi8+=tpieqvdwtvwwa...@mail.gmail.com%3E

os.environ['PYSPARK_PYTHON'] ='/path/to/virtualenv/bin/python'

and to my surprise that worked. I don't understand why that makes sense 
as I can't find any mention of the environment of the driver program 
overriding the environment in the workers, also that environment 
variable was previously completely unset in the driver program anyway.


Is there an explanation for this to help me understand how to do things 
properly? We run Spark 1.6.0 on Ubuntu 14.04.


Thanks

Kostas


Re: Is Spark right for us?

2016-03-07 Thread Guillaume Bilodeau
Hi everyone,

First thanks for taking some time on your Sunday to reply.  Some points in
no particular order:

. The feedback from everyone tells me that I have a lot of reading to do
first.  Thanks for all the pointers.
. The data is currently stored in a row-oriented database (SQL Server 2012
to be precise), but as I said we're open to moving data to a different kind
of data store (column-oriented, document, etc.)
. I don't have precise numbers for the size of the database, but I would
guess the larger ones have around 100 GB of data.  To us, this is huge;
obviously, for companies such as Google, it's a second's worth of data.
. For this particular issue, we're talking about ordinal data, not free
text fields.
. I agree that Spark is tooling, but I also see it as an implementation of
a specific design, namely distributed computing on a distributed data
store, if I understand correctly.
. For sure, I would like to avoid introducing a new technology to the mix,
so reusing the current infrastructure in a more optimal way would be our
first choice.
. Our main issue is that we'd like to be able to scale by distributing
instead of adding more memory to this single database.  The current
computations are done using SQL queries.  The data set does not fit in
memory.  So yes, we could distribute query construction and result
aggregation, but the database would still be the bottleneck.  That's why
I'm wondering if we should investigate technologies such as Spark or
Hadoop, but maybe I'm completely mistaken and we can leverage our current
infrastructure.

Thanks,
GB


On Mon, Mar 7, 2016 at 3:05 AM, Jörn Franke  wrote:

> I think the Relational Database will be faster for ordinal data (eg where
> you answer from 1..x). For free text fields I would recommend solr or
> elastic search, because they have a lot more text analytics capabilities
> that do not exist in a relational database or MongoDB and are not likely to
> be there in the near future.
>
> On 06 Mar 2016, at 18:25, Guillaume Bilodeau 
> wrote:
>
> The data is currently stored in a relational database, but a migration to
> a document-oriented database such as MongoDb is something we are definitely
> considering.  How does this factor in?
>
> On Sun, Mar 6, 2016 at 12:23 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> That depends on a lot of things, but as a starting point I would ask
>> whether you are planning to store your data in JSON format?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Sun, Mar 6, 2016 at 5:17 PM, Laumegui Deaulobi <
>> guillaume.bilod...@gmail.com> wrote:
>>
>>> Our problem space is survey analytics.  Each survey comprises a set of
>>> questions, with each question having a set of possible answers.  Survey
>>> fill-out tasks are sent to users, who have until a certain date to
>>> complete
>>> it.  Based on these survey fill-outs, reports need to be generated.  Each
>>> report deals with a subset of the survey fill-outs, and comprises a set
>>> of
>>> data points (average rating for question 1, min/max for question 2, etc.)
>>>
>>> We are dealing with rather large data sets - although reading the
>>> internet
>>> we get the impression that everyone is analyzing petabytes of data...
>>>
>>> Users: up to 100,000
>>> Surveys: up to 100,000
>>> Questions per survey: up to 100
>>> Possible answers per question: up to 10
>>> Survey fill-outs / user: up to 10
>>> Reports: up to 100,000
>>> Data points per report: up to 100
>>>
>>> Data is currently stored in a relational database but a migration to a
>>> different kind of store is possible.
>>>
>>> The naive algorithm for report generation can be summed up as this:
>>>
>>> for each report to be generated {
>>>   for each report data point to be calculated {
>>> calculate data point
>>> add data point to report
>>>   }
>>>   publish report
>>> }
>>>
>>> In order to deal with the upper limits of these values, we will need to
>>> distribute this algorithm to a compute / data cluster as much as
>>> possible.
>>>
>>> I've read about frameworks such as Apache Spark but also Hadoop,
>>> GridGain,
>>> HazelCast and several others, and am still confused as to how each of
>>> these
>>> can help us and how they fit together.
>>>
>>> Is Spark the right framework for us?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>> .
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Steps to Run Spark Scala job from Oozie on EC2 Hadoop clsuter

2016-03-07 Thread Chandeep Singh
As a work around you could put your spark-submit statement in a shell script 
and then use Oozie’s SSH action to execute that script.

> On Mar 7, 2016, at 3:58 PM, Neelesh Salian  wrote:
> 
> Hi Divya,
> 
> This link should have the details that you need to begin using the Spark 
> Action on Oozie:
> https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html 
> 
> 
> Thanks.
> 
> On Mon, Mar 7, 2016 at 7:52 AM, Benjamin Kim  > wrote:
> To comment…
> 
> At my company, we have not gotten it to work in any other mode than local. If 
> we try any of the yarn modes, it fails with a “file does not exist” error 
> when trying to locate the executable jar. I mentioned this to the Hue users 
> group, which we used for this, and they replied that the Spark Action is very 
> basic implementation and that they will be writing their own for production 
> use.
> 
> That’s all I know...
> 
>> On Mar 7, 2016, at 1:18 AM, Deepak Sharma > > wrote:
>> 
>> There is Spark action defined for oozie workflows.
>> Though I am not sure if it supports only Java SPARK jobs or Scala jobs as 
>> well.
>> https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html 
>> 
>> Thanks
>> Deepak
>> 
>> On Mon, Mar 7, 2016 at 2:44 PM, Divya Gehlot > > wrote:
>> Hi,
>> 
>> Could somebody help me by providing the steps /redirect me  to 
>> blog/documentation on how to run Spark job written in scala through Oozie.
>> 
>> Would really appreciate the help.
>> 
>> 
>> 
>> Thanks,
>> Divya 
>> 
>> 
>> 
>> -- 
>> Thanks
>> Deepak
>> www.bigdatabig.com 
>> www.keosha.net 
> 
> 
> 
> -- 
> Neelesh Srinivas Salian
> Customer Operations Engineer
> 
> 
> 



Re: Steps to Run Spark Scala job from Oozie on EC2 Hadoop clsuter

2016-03-07 Thread Neelesh Salian
Hi Divya,

This link should have the details that you need to begin using the Spark
Action on Oozie:
https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html

Thanks.

On Mon, Mar 7, 2016 at 7:52 AM, Benjamin Kim  wrote:

> To comment…
>
> At my company, we have not gotten it to work in any other mode than local.
> If we try any of the yarn modes, it fails with a “file does not exist”
> error when trying to locate the executable jar. I mentioned this to the Hue
> users group, which we used for this, and they replied that the Spark Action
> is very basic implementation and that they will be writing their own for
> production use.
>
> That’s all I know...
>
> On Mar 7, 2016, at 1:18 AM, Deepak Sharma  wrote:
>
> There is Spark action defined for oozie workflows.
> Though I am not sure if it supports only Java SPARK jobs or Scala jobs as
> well.
> https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html
> Thanks
> Deepak
>
> On Mon, Mar 7, 2016 at 2:44 PM, Divya Gehlot 
> wrote:
>
>> Hi,
>>
>> Could somebody help me by providing the steps /redirect me  to
>> blog/documentation on how to run Spark job written in scala through Oozie.
>>
>> Would really appreciate the help.
>>
>>
>>
>> Thanks,
>> Divya
>>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>
>
>


-- 
Neelesh Srinivas Salian
Customer Operations Engineer


Re: how to implement ALS with csv file? getting error while calling Rating class

2016-03-07 Thread Kevin Mellott
If you are using DataFrames, then you also can specify the schema when
loading as an alternate solution. I've found Spark-CSV
 to be a very useful library when
working with CSV data.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader


On Mon, Mar 7, 2016 at 1:10 AM, Nick Pentreath 
wrote:

> As you've pointed out, Rating requires user and item ids in Int form. So
> you will need to map String user ids to integers.
>
> See this thread for example:
> https://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAJgQjQ9GhGqpg1=hvxpfrs+59elfj9f7knhp8nyqnh1ut_6...@mail.gmail.com%3E
> .
>
> There is a DeveloperApi method
> in org.apache.spark.ml.recommendation.ALS that takes Rating with generic
> type (can be String) for user id and item id. However that is a little more
> involved, and for larger scale data will be a lot less efficient.
>
> Something like this for example:
>
> import org.apache.spark.ml.recommendation.ALS
> import org.apache.spark.ml.recommendation.ALS.Rating
>
> val conf = new SparkConf().setAppName("ALSWithStringID").setMaster("local[4]")
> val sc = new SparkContext(conf)
> // Name,Value1,Value2.
> val rdd = sc.parallelize(Seq(
>   Rating[String]("foo", "1", 4.0f),
>   Rating[String]("foo", "2", 2.0f),
>   Rating[String]("bar", "1", 5.0f),
>   Rating[String]("bar", "3", 1.0f)
> ))
> val (userFactors, itemFactors) = ALS.train(rdd)
>
>
> As you can see, you just get the factor RDDs back, and if you want an
> ALSModel you will have to construct it yourself.
>
>
> On Sun, 6 Mar 2016 at 18:23 Shishir Anshuman 
> wrote:
>
>> I am new to apache Spark, and I want to implement the Alternating Least
>> Squares algorithm. The data set is stored in a csv file in the format:
>> *Name,Value1,Value2*.
>>
>> When I read the csv file, I get
>> *java.lang.NumberFormatException.forInputString* error because the
>> Rating class needs the parameters in the format: *(user: Int, product:
>> Int, rating: Double)* and the first column of my file contains *Name*.
>>
>> Please suggest me a way to overcome this issue.
>>
>


Re: Steps to Run Spark Scala job from Oozie on EC2 Hadoop clsuter

2016-03-07 Thread Benjamin Kim
To comment…

At my company, we have not gotten it to work in any other mode than local. If 
we try any of the yarn modes, it fails with a “file does not exist” error when 
trying to locate the executable jar. I mentioned this to the Hue users group, 
which we used for this, and they replied that the Spark Action is very basic 
implementation and that they will be writing their own for production use.

That’s all I know...

> On Mar 7, 2016, at 1:18 AM, Deepak Sharma  wrote:
> 
> There is Spark action defined for oozie workflows.
> Though I am not sure if it supports only Java SPARK jobs or Scala jobs as 
> well.
> https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html 
> 
> Thanks
> Deepak
> 
> On Mon, Mar 7, 2016 at 2:44 PM, Divya Gehlot  > wrote:
> Hi,
> 
> Could somebody help me by providing the steps /redirect me  to 
> blog/documentation on how to run Spark job written in scala through Oozie.
> 
> Would really appreciate the help.
> 
> 
> 
> Thanks,
> Divya 
> 
> 
> 
> -- 
> Thanks
> Deepak
> www.bigdatabig.com 
> www.keosha.net 


[Streaming] Difference between windowed stream and stream with large batch size?

2016-03-07 Thread Hao Ren
I want to understand the advantage of using windowed stream.

For example,

Stream 1:
initial duration = 5 s,
and then transformed into a stream windowed by (*windowLength = *30s,
*slideInterval
= *30s)

Stream 2:
Duration = 30 s

Questions:

1. Is Stream 1 equivalent to Stream 2 on behavior ? Do users observe the
same result ?
2. If yes, what is the advantage of one vs. another ? Performance or
something else ?
3. Is a stream with large batch reasonable , say 30 mins or even an hour ?

Thank you.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: reading the parquet file in spark sql

2016-03-07 Thread Manoj Awasthi
>From the parquet file content (dir content) it doesn't look like that
parquet write was successful or complete.

On Mon, Mar 7, 2016 at 11:17 AM, Angel Angel 
wrote:

> Hello Sir/Madam,
>
> I am running one spark application having 3 slaves and one master.
>
> I am wring the my information using the parquet format.
>
> but when i am trying to read it got some error.
> Please help me to resolve this problem.
>
> code ;
>
>
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
>
> import sqlContext.implicits._
>
>
> case class Table(Address: String, Couple_time: Int, WT_ID: Int, WT_Name:
> String)
>
>
> val df2 =
> sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p =>
> Table(p(0),p(1).trim.toInt, p(2).trim.toInt, p(3)))toDF
>
>
> df2.write.parquet("Desktop/database2.parquet")
>
>
>
>
> After that on master computer there is on folder name database2 have the
> _success file
>
> and on my slaves
>
>  has the following tree
>
> database2.parquet
> └── _temporary
> └── 0
> ├── task_201603071435__m_01
> │   └── part-r-2.gz.parquet
> ├── task_201603071435__m_04
> │   └── part-r-5.gz.parquet
> └── _temporary
>
>
>
> But when i am trying to read this file using following command i get the
> error
>
>
> val df1 = sqlContext.read.parquet("Desktop/database2.parquet")
>
>
>
> error
>
>
> ava.lang.AssertionError: assertion failed: No schema defined, and no
> Parquet data file or summary file found under file:/root/database2.parquet.
>
> at scala.Predef$.assert(Predef.scala:179)
>
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.org$apache$spark$sql$parquet$ParquetRelation2$MetadataCache$$readSchema(newParquet.scala:443)
>
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$15.apply(newParquet.scala:385)
>
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$15.apply(newParquet.scala:385)
>
> at scala.Option.orElse(Option.scala:257)
>
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:385)
>
> at org.apache.spark.sql.parquet.ParquetRelation2.org
> $apache$spark$sql$parquet$ParquetRelation2$$metadataCache$lzycompute(newParquet.scala:154)
>
> at org.apache.spark.sql.parquet.ParquetRelation2.org
> $apache$spark$sql$parquet$ParquetRelation2$$metadataCache(newParquet.scala:152)
>
> at
> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:193)
>
> at
> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:193)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at
> org.apache.spark.sql.parquet.ParquetRelation2.dataSchema(newParquet.scala:193)
>
> at org.apache.spark.sql.sources.HadoopFsRel
>
>
>
>
> Thanks.
>


using MongoDB Tailable Cursor in Spark Streaming

2016-03-07 Thread Shams ul Haque
Hi,

I want to implement Streaming using Mongo Tailable. Please give me hint how
can i do this.
I think i have to extend some class and used its method to do the stuff.
Please give me a hint.


Thanks and regards
Shams ul Haque


Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-07 Thread Ted Yu
Have you tried clicking on Create button from an existing Spark JIRA ?
e.g.
https://issues.apache.org/jira/browse/SPARK-4352

Once you're logged in, you should be able to select Spark as the Project.

Cheers

On Mon, Mar 7, 2016 at 2:54 AM, James Hammerton  wrote:

> Hi,
>
> So I managed to isolate the bug and I'm ready to try raising a JIRA issue.
> I joined the Apache Jira project so I can create tickets.
>
> However when I click Create from the Spark project home page on JIRA, it
> asks me to click on one of the following service desks: Kylin, Atlas,
> Ranger, Apache Infrastructure. There doesn't seem to be an option for me to
> raise an issue for Spark?!
>
> Regards,
>
> James
>
>
> On 4 March 2016 at 14:03, James Hammerton  wrote:
>
>> Sure thing, I'll see if I can isolate this.
>>
>> Regards.
>>
>> James
>>
>> On 4 March 2016 at 12:24, Ted Yu  wrote:
>>
>>> If you can reproduce the following with a unit test, I suggest you open
>>> a JIRA.
>>>
>>> Thanks
>>>
>>> On Mar 4, 2016, at 4:01 AM, James Hammerton  wrote:
>>>
>>> Hi,
>>>
>>> I've come across some strange behaviour with Spark 1.6.0.
>>>
>>> In the code below, the filtering by "eventName" only seems to work if I
>>> called .cache on the resulting DataFrame.
>>>
>>> If I don't do this, the code crashes inside the UDF because it processes
>>> an event that the filter should get rid off.
>>>
>>> Any ideas why this might be the case?
>>>
>>> The code is as follows:
>>>
   val df = sqlContext.read.parquet(inputPath)
   val filtered = df.filter(df("eventName").equalTo(Created))
   val extracted = extractEmailReferences(sqlContext,
 filtered.cache) // Caching seems to be required for the filter to work
   extracted.write.parquet(outputPath)
>>>
>>>
>>> where extractEmailReferences does this:
>>>

>>>
>>> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
 DataFrame = {
>>>
>>> val extracted = df.select(df(EventFieldNames.ObjectId),
>>>
>>>   extractReferencesUDF(df(EventFieldNames.EventJson),
 df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")
>>>
>>>
 extracted.filter(extracted("references").notEqual("UNKNOWN"))
>>>
>>>   }
>>>
>>>
>>> and extractReferencesUDF:
>>>
 def extractReferencesUDF = udf(extractReferences(_: String, _: String,
 _: String))
>>>
>>> def extractReferences(eventJson: String, objectId: String, userId:
 String): String = {
 import org.json4s.jackson.Serialization
 import org.json4s.NoTypeHints
 implicit val formats = Serialization.formats(NoTypeHints)

 val created = Serialization.read[GMailMessage.Created](eventJson)
 // This is where the code crashes if the .cache isn't called
>>>
>>>
>>>  Regards,
>>>
>>> James
>>>
>>>
>>
>


Re: org.apache.spark.sql.types.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow

2016-03-07 Thread dmt
Is there a workaround ? My dataset contains billions of rows, and it would be
nice to ignore/exclude the few lines that are badly formatted.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-sql-types-GenericArrayData-cannot-be-cast-to-org-apache-spark-sql-catalyst-Internalw-tp26377p26420.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Aggregations/Joins

2016-03-07 Thread Ricardo Paiva
Have you ever tried to use join? Both RDD and Dataframe have this method
and it does a join like traditional relational database does.

On Sat, Mar 5, 2016 at 3:17 AM, Agro [via Apache Spark User List] <
ml-node+s1001560n26403...@n3.nabble.com> wrote:

> So, initially, I have an RDD[Int] that I've loaded from my database, where
> each Int is a user ID. For each of these user IDs, I need to gather a bunch
> of other data (a list of recommended product IDs), which makes use of an
> RDD as well. I've tried doing this out, but Spark doesn't allow nesting RDD
> operations on two different RDDs together. I feel like this a common
> problem, so are there any general solutions you guys know about?
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Aggregations-Joins-tp26403.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>



-- 
Ricardo Paiva
Big Data / Semântica
2483-6432
*globo.com* 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Aggregations-Joins-tp26403p26418.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: org.apache.spark.sql.types.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow

2016-03-07 Thread dmt
I have found why the exception is raised.
I have defined a JSON schema, using org.apache.spark.sql.types.StructType,
that expects this kind of record :
/{
  "request": {
"user": {
  "id": 123
}
  }
}/

There's a bad record in my dataset, that defines field "user" as an array,
instead of a JSON object :
/{
  "request": {
"user": []
  }
}/

I have created the following issue :
https://issues.apache.org/jira/browse/SPARK-13719

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-sql-types-GenericArrayData-cannot-be-cast-to-org-apache-spark-sql-catalyst-Internalw-tp26377p26417.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-07 Thread James Hammerton
Hi,

So I managed to isolate the bug and I'm ready to try raising a JIRA issue.
I joined the Apache Jira project so I can create tickets.

However when I click Create from the Spark project home page on JIRA, it
asks me to click on one of the following service desks: Kylin, Atlas,
Ranger, Apache Infrastructure. There doesn't seem to be an option for me to
raise an issue for Spark?!

Regards,

James


On 4 March 2016 at 14:03, James Hammerton  wrote:

> Sure thing, I'll see if I can isolate this.
>
> Regards.
>
> James
>
> On 4 March 2016 at 12:24, Ted Yu  wrote:
>
>> If you can reproduce the following with a unit test, I suggest you open a
>> JIRA.
>>
>> Thanks
>>
>> On Mar 4, 2016, at 4:01 AM, James Hammerton  wrote:
>>
>> Hi,
>>
>> I've come across some strange behaviour with Spark 1.6.0.
>>
>> In the code below, the filtering by "eventName" only seems to work if I
>> called .cache on the resulting DataFrame.
>>
>> If I don't do this, the code crashes inside the UDF because it processes
>> an event that the filter should get rid off.
>>
>> Any ideas why this might be the case?
>>
>> The code is as follows:
>>
>>>   val df = sqlContext.read.parquet(inputPath)
>>>   val filtered = df.filter(df("eventName").equalTo(Created))
>>>   val extracted = extractEmailReferences(sqlContext, filtered.cache)
>>> // Caching seems to be required for the filter to work
>>>   extracted.write.parquet(outputPath)
>>
>>
>> where extractEmailReferences does this:
>>
>>>
>>
>> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
>>> DataFrame = {
>>
>> val extracted = df.select(df(EventFieldNames.ObjectId),
>>
>>   extractReferencesUDF(df(EventFieldNames.EventJson),
>>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")
>>
>>
>>> extracted.filter(extracted("references").notEqual("UNKNOWN"))
>>
>>   }
>>
>>
>> and extractReferencesUDF:
>>
>>> def extractReferencesUDF = udf(extractReferences(_: String, _: String,
>>> _: String))
>>
>> def extractReferences(eventJson: String, objectId: String, userId:
>>> String): String = {
>>> import org.json4s.jackson.Serialization
>>> import org.json4s.NoTypeHints
>>> implicit val formats = Serialization.formats(NoTypeHints)
>>>
>>> val created = Serialization.read[GMailMessage.Created](eventJson) //
>>> This is where the code crashes if the .cache isn't called
>>
>>
>>  Regards,
>>
>> James
>>
>>
>


Editing spark.ml package code in Pyspark

2016-03-07 Thread Khaled Ali
Is it possible to edit some codes inside spark.ml package in pyspark? e.g. I
am calculating TFIDF using pyspark ML. But, I would like to do a little
change on the Inverse document frequency equation/(IDF(t,D)= log ((|D|+1)
/DF(t,D) + 1))/ How can I do that?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Editing-spark-ml-package-code-in-Pyspark-tp26416.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PYSPARK_PYTHON doesn't work in spark worker

2016-03-07 Thread Gourav Sengupta
hi,

how are you running your SPARK cluster (is it in local mode or distributed
mode).  Do you have pyspark installed in anaconda?


Regards,
Gourav Sengupta

On Mon, Mar 7, 2016 at 9:28 AM, guoqing0...@yahoo.com.hk <
guoqing0...@yahoo.com.hk> wrote:

> Hi all
> I had following configuration in spark worker (spark-env.sh)
> export PYTHON_HOME=/opt/soft/anaconda2
> export PYSPARK_PYTHON=$PYTHON_HOME/bin/python
>
> I'm try to run a simple test script on
> pyspark --master yarn --queue spark --executor-cores 1  --num-executors 10
> from pyspark import SparkContext, SparkConf
> df = sc.textFile("/user/hive/warehouse/hdptest.db/111");
> df.collect();
> sortedCount = df.map(lambda x: (float(x), 1)).sortByKey();
> sortedCount.collect();
>
> However , i get following error  :
>
>
>
>
>
> *org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
> stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 
> (TID 7, datanode-8): org.apache.spark.SparkException: Error from python 
> worker:  /usr/bin/python: module pyspark.daemon not foundPYTHONPATH was:  
> /home/hadoop/spark/python/lib/pyspark.zip:/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip:/data/hadoop_local/usercache/hdptest/filecache/38/spark-assembly-1.4.1-hadoop2.5.2.jar:pyspark.zip:py4j-0.8.2.1-src.zip*
>
> It seem that PYSPARK_PYTHON doesn't work in spark worker  ,  can someone
> please help me to solve it ?
>
> Thanks~
> --
> guoqing0...@yahoo.com.hk
>


PYSPARK_PYTHON doesn't work in spark worker

2016-03-07 Thread guoqing0...@yahoo.com.hk
Hi all
I had following configuration in spark worker (spark-env.sh)
export PYTHON_HOME=/opt/soft/anaconda2
export PYSPARK_PYTHON=$PYTHON_HOME/bin/python

I'm try to run a simple test script on  pyspark --master yarn --queue spark 
--executor-cores 1  --num-executors 10
from pyspark import SparkContext, SparkConf
df = sc.textFile("/user/hive/warehouse/hdptest.db/111");
df.collect();
sortedCount = df.map(lambda x: (float(x), 1)).sortByKey();
sortedCount.collect();

However , i get following error  : 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 
7, datanode-8): org.apache.spark.SparkException: 
Error from python worker:
  /usr/bin/python: module pyspark.daemon not found
PYTHONPATH was:
  
/home/hadoop/spark/python/lib/pyspark.zip:/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip:/data/hadoop_local/usercache/hdptest/filecache/38/spark-assembly-1.4.1-hadoop2.5.2.jar:pyspark.zip:py4j-0.8.2.1-src.zip

It seem that PYSPARK_PYTHON doesn't work in spark worker  ,  can someone please 
help me to solve it ? 

Thanks~


guoqing0...@yahoo.com.hk


Re: Understanding the Web_UI 4040

2016-03-07 Thread Mark Hamstra
There's probably nothing wrong other than a glitch in the reporting of
Executor state transitions to the UI -- one of those low-priority items
I've been meaning to look at for awhile

On Mon, Mar 7, 2016 at 12:15 AM, Sonal Goyal  wrote:

> Maybe check the worker logs to see what's going wrong with it?
> On Mar 7, 2016 9:10 AM, "Angel Angel"  wrote:
>
>> Hello Sir/Madam,
>>
>>
>> I am running the spark-sql application on the cluster.
>> In my cluster there are 3 slaves and one Master.
>>
>> When i saw the progress of my application in web UI hadoopm0:8080
>>
>> I found that one of my slaves node is always in  *LOADDING *mode.
>>
>> Can you tell me what is that mean?
>>
>> Also i am unable to see the DAG graph (As click on the DAG graph it hangs
>> the scree for some time).
>>
>> [image: Inline image 1]
>>
>


Re: Steps to Run Spark Scala job from Oozie on EC2 Hadoop clsuter

2016-03-07 Thread Deepak Sharma
There is Spark action defined for oozie workflows.
Though I am not sure if it supports only Java SPARK jobs or Scala jobs as
well.
https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html
Thanks
Deepak

On Mon, Mar 7, 2016 at 2:44 PM, Divya Gehlot 
wrote:

> Hi,
>
> Could somebody help me by providing the steps /redirect me  to
> blog/documentation on how to run Spark job written in scala through Oozie.
>
> Would really appreciate the help.
>
>
>
> Thanks,
> Divya
>



-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Steps to Run Spark Scala job from Oozie on EC2 Hadoop clsuter

2016-03-07 Thread Divya Gehlot
Hi,

Could somebody help me by providing the steps /redirect me  to
blog/documentation on how to run Spark job written in scala through Oozie.

Would really appreciate the help.



Thanks,
Divya


Re: Understanding the Web_UI 4040

2016-03-07 Thread Sonal Goyal
Maybe check the worker logs to see what's going wrong with it?
On Mar 7, 2016 9:10 AM, "Angel Angel"  wrote:

> Hello Sir/Madam,
>
>
> I am running the spark-sql application on the cluster.
> In my cluster there are 3 slaves and one Master.
>
> When i saw the progress of my application in web UI hadoopm0:8080
>
> I found that one of my slaves node is always in  *LOADDING *mode.
>
> Can you tell me what is that mean?
>
> Also i am unable to see the DAG graph (As click on the DAG graph it hangs
> the scree for some time).
>
> [image: Inline image 1]
>


Re: Is Spark right for us?

2016-03-07 Thread Jörn Franke
I think the Relational Database will be faster for ordinal data (eg where you 
answer from 1..x). For free text fields I would recommend solr or elastic 
search, because they have a lot more text analytics capabilities that do not 
exist in a relational database or MongoDB and are not likely to be there in the 
near future.

> On 06 Mar 2016, at 18:25, Guillaume Bilodeau  
> wrote:
> 
> The data is currently stored in a relational database, but a migration to a 
> document-oriented database such as MongoDb is something we are definitely 
> considering.  How does this factor in?
> 
>> On Sun, Mar 6, 2016 at 12:23 PM, Gourav Sengupta  
>> wrote:
>> Hi,
>> 
>> That depends on a lot of things, but as a starting point I would ask whether 
>> you are planning to store your data in JSON format?
>> 
>> 
>> Regards,
>> Gourav Sengupta
>> 
>>> On Sun, Mar 6, 2016 at 5:17 PM, Laumegui Deaulobi 
>>>  wrote:
>>> Our problem space is survey analytics.  Each survey comprises a set of
>>> questions, with each question having a set of possible answers.  Survey
>>> fill-out tasks are sent to users, who have until a certain date to complete
>>> it.  Based on these survey fill-outs, reports need to be generated.  Each
>>> report deals with a subset of the survey fill-outs, and comprises a set of
>>> data points (average rating for question 1, min/max for question 2, etc.)
>>> 
>>> We are dealing with rather large data sets - although reading the internet
>>> we get the impression that everyone is analyzing petabytes of data...
>>> 
>>> Users: up to 100,000
>>> Surveys: up to 100,000
>>> Questions per survey: up to 100
>>> Possible answers per question: up to 10
>>> Survey fill-outs / user: up to 10
>>> Reports: up to 100,000
>>> Data points per report: up to 100
>>> 
>>> Data is currently stored in a relational database but a migration to a
>>> different kind of store is possible.
>>> 
>>> The naive algorithm for report generation can be summed up as this:
>>> 
>>> for each report to be generated {
>>>   for each report data point to be calculated {
>>> calculate data point
>>> add data point to report
>>>   }
>>>   publish report
>>> }
>>> 
>>> In order to deal with the upper limits of these values, we will need to
>>> distribute this algorithm to a compute / data cluster as much as possible.
>>> 
>>> I've read about frameworks such as Apache Spark but also Hadoop, GridGain,
>>> HazelCast and several others, and am still confused as to how each of these
>>> can help us and how they fit together.
>>> 
>>> Is Spark the right framework for us?
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>