Re: Imported CSV file content isn't identical to the original file

2016-02-07 Thread SLiZn Liu
Plus, I’m using *Spark 1.5.2*, with *spark-csv 1.3.0*. Also tried
HiveContext, but the result is exactly the same.
​

On Sun, Feb 7, 2016 at 3:44 PM SLiZn Liu  wrote:

> Hi Spark Users Group,
>
> I have a csv file to analysis with Spark, but I’m troubling with importing
> as DataFrame.
>
> Here’s the minimal reproducible example. Suppose I’m having a
> *10(rows)x2(cols)* *space-delimited csv* file, shown as below:
>
> 1446566430 2015-11-0400:00:30
> 1446566430 2015-11-0400:00:30
> 1446566430 2015-11-0400:00:30
> 1446566430 2015-11-0400:00:30
> 1446566430 2015-11-0400:00:30
> 1446566431 2015-11-0400:00:31
> 1446566431 2015-11-0400:00:31
> 1446566431 2015-11-0400:00:31
> 1446566431 2015-11-0400:00:31
> 1446566431 2015-11-0400:00:31
>
> the  in column 2 represents sub-delimiter within that column, and
> this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv
>
> I’m using *spark-csv* to import this file as Spark *DataFrame*:
>
> sqlContext.read.format("com.databricks.spark.csv")
> .option("header", "false") // Use first line of all files as header
> .option("inferSchema", "false") // Automatically infer data types
> .option("delimiter", " ")
> .load("hdfs:///tmp/1.csv")
> .show
>
> Oddly, the output shows only a part of each column:
>
> [image: Screenshot from 2016-02-07 15-27-51.png]
>
> and even the boundary of the table wasn’t shown correctly. I also used the
> other way to read csv file, by sc.textFile(...).map(_.split(" ")) and
> sqlContext.createDataFrame, and the result is the same. Can someone point
> me out where I did it wrong?
>
> —
> BR,
> Todd Leo
> ​
>


Re: Imported CSV file content isn't identical to the original file

2016-02-07 Thread Igor Berman
show has argument of truncate
pass false so it wont truncate your results

On 7 February 2016 at 11:01, SLiZn Liu  wrote:

> Plus, I’m using *Spark 1.5.2*, with *spark-csv 1.3.0*. Also tried
> HiveContext, but the result is exactly the same.
> ​
>
> On Sun, Feb 7, 2016 at 3:44 PM SLiZn Liu  wrote:
>
>> Hi Spark Users Group,
>>
>> I have a csv file to analysis with Spark, but I’m troubling with
>> importing as DataFrame.
>>
>> Here’s the minimal reproducible example. Suppose I’m having a
>> *10(rows)x2(cols)* *space-delimited csv* file, shown as below:
>>
>> 1446566430 2015-11-0400:00:30
>> 1446566430 2015-11-0400:00:30
>> 1446566430 2015-11-0400:00:30
>> 1446566430 2015-11-0400:00:30
>> 1446566430 2015-11-0400:00:30
>> 1446566431 2015-11-0400:00:31
>> 1446566431 2015-11-0400:00:31
>> 1446566431 2015-11-0400:00:31
>> 1446566431 2015-11-0400:00:31
>> 1446566431 2015-11-0400:00:31
>>
>> the  in column 2 represents sub-delimiter within that column, and
>> this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv
>>
>> I’m using *spark-csv* to import this file as Spark *DataFrame*:
>>
>> sqlContext.read.format("com.databricks.spark.csv")
>> .option("header", "false") // Use first line of all files as header
>> .option("inferSchema", "false") // Automatically infer data types
>> .option("delimiter", " ")
>> .load("hdfs:///tmp/1.csv")
>> .show
>>
>> Oddly, the output shows only a part of each column:
>>
>> [image: Screenshot from 2016-02-07 15-27-51.png]
>>
>> and even the boundary of the table wasn’t shown correctly. I also used
>> the other way to read csv file, by sc.textFile(...).map(_.split(" "))
>> and sqlContext.createDataFrame, and the result is the same. Can someone
>> point me out where I did it wrong?
>>
>> —
>> BR,
>> Todd Leo
>> ​
>>
>


Re: Imported CSV file content isn't identical to the original file

2016-02-07 Thread SLiZn Liu
Hi Igor,

In my case, it’s not a matter of *truncate*. As the show() function in
Spark API doc reads,

truncate: Whether truncate long strings. If true, strings more than 20
characters will be truncated and all cells will be aligned right…

whereas the leading characters of my two columns are missing.

Good to know the way to show the whole content in a cell.

—
BR,
Todd Leo
​




On Sun, Feb 7, 2016 at 5:42 PM Igor Berman  wrote:

> show has argument of truncate
> pass false so it wont truncate your results
>
> On 7 February 2016 at 11:01, SLiZn Liu  wrote:
>
>> Plus, I’m using *Spark 1.5.2*, with *spark-csv 1.3.0*. Also tried
>> HiveContext, but the result is exactly the same.
>> ​
>>
>> On Sun, Feb 7, 2016 at 3:44 PM SLiZn Liu  wrote:
>>
>>> Hi Spark Users Group,
>>>
>>> I have a csv file to analysis with Spark, but I’m troubling with
>>> importing as DataFrame.
>>>
>>> Here’s the minimal reproducible example. Suppose I’m having a
>>> *10(rows)x2(cols)* *space-delimited csv* file, shown as below:
>>>
>>> 1446566430 2015-11-0400:00:30
>>> 1446566430 2015-11-0400:00:30
>>> 1446566430 2015-11-0400:00:30
>>> 1446566430 2015-11-0400:00:30
>>> 1446566430 2015-11-0400:00:30
>>> 1446566431 2015-11-0400:00:31
>>> 1446566431 2015-11-0400:00:31
>>> 1446566431 2015-11-0400:00:31
>>> 1446566431 2015-11-0400:00:31
>>> 1446566431 2015-11-0400:00:31
>>>
>>> the  in column 2 represents sub-delimiter within that column, and
>>> this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv
>>>
>>> I’m using *spark-csv* to import this file as Spark *DataFrame*:
>>>
>>> sqlContext.read.format("com.databricks.spark.csv")
>>> .option("header", "false") // Use first line of all files as header
>>> .option("inferSchema", "false") // Automatically infer data types
>>> .option("delimiter", " ")
>>> .load("hdfs:///tmp/1.csv")
>>> .show
>>>
>>> Oddly, the output shows only a part of each column:
>>>
>>> [image: Screenshot from 2016-02-07 15-27-51.png]
>>>
>>> and even the boundary of the table wasn’t shown correctly. I also used
>>> the other way to read csv file, by sc.textFile(...).map(_.split(" "))
>>> and sqlContext.createDataFrame, and the result is the same. Can someone
>>> point me out where I did it wrong?
>>>
>>> —
>>> BR,
>>> Todd Leo
>>> ​
>>>
>>
>


Re: Shuffle memory woes

2016-02-07 Thread Igor Berman
so can you provide code snippets: especially it's interesting to see what
are your transformation chain, how many partitions are there on each side
of shuffle operation

the question is why it can't fit stuff in memory when you are shuffling -
maybe your partitioner on "reduce" side is not configured properly? I mean
if map side is ok, and you just reducing by key or something it should be
ok, so some detail is missing...skewed data? aggregate by key?

On 6 February 2016 at 20:13, Corey Nolet  wrote:

> Igor,
>
> Thank you for the response but unfortunately, the problem I'm referring to
> goes beyond this. I have set the shuffle memory fraction to be 90% and set
> the cache memory to be 0. Repartitioning the RDD helped a tad on the map
> side but didn't do much for the spilling when there was no longer any
> memory left for the shuffle. Also the new auto-memory management doesn't
> seem like it'll have too much of an effect after i've already given most
> the memory i've allocated to the shuffle. The problem I'm having is most
> specifically related to the shuffle performing declining by several orders
> of magnitude when it needs to spill multiple times (it ends up spilling
> several hundred for me when it can't fit stuff into memory).
>
>
>
> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman  wrote:
>
>> Hi,
>> usually you can solve this by 2 steps
>> make rdd to have more partitions
>> play with shuffle memory fraction
>>
>> in spark 1.6 cache vs shuffle memory fractions are adjusted automatically
>>
>> On 5 February 2016 at 23:07, Corey Nolet  wrote:
>>
>>> I just recently had a discovery that my jobs were taking several hours
>>> to completely because of excess shuffle spills. What I found was that when
>>> I hit the high point where I didn't have enough memory for the shuffles to
>>> store all of their file consolidations at once, it could spill so many
>>> times that it causes my job's runtime to increase by orders of magnitude
>>> (and sometimes fail altogether).
>>>
>>> I've played with all the tuning parameters I can find. To speed the
>>> shuffles up, I tuned the akka threads to different values. I also tuned the
>>> shuffle buffering a tad (both up and down).
>>>
>>> I feel like I see a weak point here. The mappers are sharing memory
>>> space with reducers and the shuffles need enough memory to consolidate and
>>> pull otherwise they will need to spill and spill and spill. What i've
>>> noticed about my jobs is that this is a difference between them taking 30
>>> minutes and 4 hours or more. Same job- just different memory tuning.
>>>
>>> I've found that, as a result of the spilling, I'm better off not caching
>>> any data in memory and lowering my storage fraction to 0 and still hoping I
>>> was able to give my shuffles enough memory that my data doesn't
>>> continuously spill. Is this the way it's supposed to be? It makes it hard
>>> because it seems like it forces the memory limits on my job- otherwise it
>>> could take orders of magnitude longer to execute.
>>>
>>>
>>
>


Re: Imported CSV file content isn't identical to the original file

2016-02-07 Thread SLiZn Liu
*Update*: on local mode(spark-shell --local[2], no matter read from local
file system or hdfs) , it works well. But it doesn’t solve this issue,
since my data scale requires hundreds of CPU cores and hundreds GB of RAM.

BTW, it’s Chinese Tradition New Year now, wish you all have a happy year
and have Great fortune in the Year of Monkey!

—
BR,
Todd Leo
​

On Sun, Feb 7, 2016 at 6:09 PM SLiZn Liu  wrote:

> Hi Igor,
>
> In my case, it’s not a matter of *truncate*. As the show() function in
> Spark API doc reads,
>
> truncate: Whether truncate long strings. If true, strings more than 20
> characters will be truncated and all cells will be aligned right…
>
> whereas the leading characters of my two columns are missing.
>
> Good to know the way to show the whole content in a cell.
>
> —
> BR,
> Todd Leo
> ​
>
>
>
>
> On Sun, Feb 7, 2016 at 5:42 PM Igor Berman  wrote:
>
>> show has argument of truncate
>> pass false so it wont truncate your results
>>
>> On 7 February 2016 at 11:01, SLiZn Liu  wrote:
>>
>>> Plus, I’m using *Spark 1.5.2*, with *spark-csv 1.3.0*. Also tried
>>> HiveContext, but the result is exactly the same.
>>> ​
>>>
>>> On Sun, Feb 7, 2016 at 3:44 PM SLiZn Liu  wrote:
>>>
 Hi Spark Users Group,

 I have a csv file to analysis with Spark, but I’m troubling with
 importing as DataFrame.

 Here’s the minimal reproducible example. Suppose I’m having a
 *10(rows)x2(cols)* *space-delimited csv* file, shown as below:

 1446566430 2015-11-0400:00:30
 1446566430 2015-11-0400:00:30
 1446566430 2015-11-0400:00:30
 1446566430 2015-11-0400:00:30
 1446566430 2015-11-0400:00:30
 1446566431 2015-11-0400:00:31
 1446566431 2015-11-0400:00:31
 1446566431 2015-11-0400:00:31
 1446566431 2015-11-0400:00:31
 1446566431 2015-11-0400:00:31

 the  in column 2 represents sub-delimiter within that column, and
 this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv

 I’m using *spark-csv* to import this file as Spark *DataFrame*:

 sqlContext.read.format("com.databricks.spark.csv")
 .option("header", "false") // Use first line of all files as header
 .option("inferSchema", "false") // Automatically infer data types
 .option("delimiter", " ")
 .load("hdfs:///tmp/1.csv")
 .show

 Oddly, the output shows only a part of each column:

 [image: Screenshot from 2016-02-07 15-27-51.png]

 and even the boundary of the table wasn’t shown correctly. I also used
 the other way to read csv file, by sc.textFile(...).map(_.split(" "))
 and sqlContext.createDataFrame, and the result is the same. Can
 someone point me out where I did it wrong?

 —
 BR,
 Todd Leo
 ​

>>>
>>


Re: Shuffle memory woes

2016-02-07 Thread Corey Nolet
As for the second part of your questions- we have a fairly complex join
process which requires a ton of stage orchestration from our driver. I've
written some code to be able to walk down our DAG tree and execute siblings
in the tree concurrently where possible (forcing cache to disk on children
that that have multiple chiildren themselves so that they can be run
concurrently). Ultimatey, we have seen significant speedup in our jobs by
keeping tasks as busy as possible processing concurrent stages. Funny
enough though, the stage that is causing problems with shuffling for us has
a lot of children and doesn't even run concurrently with any other stages
so I ruled out the concurrency of the stages as a culprit for the
shuffliing problem we're seeing.

On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet  wrote:

> Igor,
>
> I don't think the question is "why can't it fit stuff in memory". I know
> why it can't fit stuff in memory- because it's a large dataset that needs
> to have a reduceByKey() run on it. My understanding is that when it doesn't
> fit into memory it needs to spill in order to consolidate intermediary
> files into a single file. The more data you need to run through this, the
> more it will need to spill. My findings is that once it gets stuck in this
> spill chain with our dataset it's all over @ that point because it will
> spill and spill and spill and spill and spill. If I give the shuffle enough
> memory it won't- irrespective of the number of partitions we have (i've
> done everything from repartition(500) to repartition(2500)). It's not a
> matter of running out of memory on a single node because the data is
> skewed. It's more a matter of the shuffle buffer filling up and needing to
> spill. I think what may be happening is that it gets to a point where it's
> spending more time reading/writing from disk while doing the spills then it
> is actually processing any data. I can tell this because I can see that the
> spills sometimes get up into the 10's to 100's of TB where the input data
> was maybe 100gb at most. Unfortunately my code is on a private internal
> network and I'm not able to share it.
>
> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman  wrote:
>
>> so can you provide code snippets: especially it's interesting to see what
>> are your transformation chain, how many partitions are there on each side
>> of shuffle operation
>>
>> the question is why it can't fit stuff in memory when you are shuffling -
>> maybe your partitioner on "reduce" side is not configured properly? I mean
>> if map side is ok, and you just reducing by key or something it should be
>> ok, so some detail is missing...skewed data? aggregate by key?
>>
>> On 6 February 2016 at 20:13, Corey Nolet  wrote:
>>
>>> Igor,
>>>
>>> Thank you for the response but unfortunately, the problem I'm referring
>>> to goes beyond this. I have set the shuffle memory fraction to be 90% and
>>> set the cache memory to be 0. Repartitioning the RDD helped a tad on the
>>> map side but didn't do much for the spilling when there was no longer any
>>> memory left for the shuffle. Also the new auto-memory management doesn't
>>> seem like it'll have too much of an effect after i've already given most
>>> the memory i've allocated to the shuffle. The problem I'm having is most
>>> specifically related to the shuffle performing declining by several orders
>>> of magnitude when it needs to spill multiple times (it ends up spilling
>>> several hundred for me when it can't fit stuff into memory).
>>>
>>>
>>>
>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman 
>>> wrote:
>>>
 Hi,
 usually you can solve this by 2 steps
 make rdd to have more partitions
 play with shuffle memory fraction

 in spark 1.6 cache vs shuffle memory fractions are adjusted
 automatically

 On 5 February 2016 at 23:07, Corey Nolet  wrote:

> I just recently had a discovery that my jobs were taking several hours
> to completely because of excess shuffle spills. What I found was that when
> I hit the high point where I didn't have enough memory for the shuffles to
> store all of their file consolidations at once, it could spill so many
> times that it causes my job's runtime to increase by orders of magnitude
> (and sometimes fail altogether).
>
> I've played with all the tuning parameters I can find. To speed the
> shuffles up, I tuned the akka threads to different values. I also tuned 
> the
> shuffle buffering a tad (both up and down).
>
> I feel like I see a weak point here. The mappers are sharing memory
> space with reducers and the shuffles need enough memory to consolidate and
> pull otherwise they will need to spill and spill and spill. What i've
> noticed about my jobs is that this is a difference between them taking 30
> minutes and 4 hours or more. Same job- just 

Re: Shuffle memory woes

2016-02-07 Thread Corey Nolet
Igor,

I don't think the question is "why can't it fit stuff in memory". I know
why it can't fit stuff in memory- because it's a large dataset that needs
to have a reduceByKey() run on it. My understanding is that when it doesn't
fit into memory it needs to spill in order to consolidate intermediary
files into a single file. The more data you need to run through this, the
more it will need to spill. My findings is that once it gets stuck in this
spill chain with our dataset it's all over @ that point because it will
spill and spill and spill and spill and spill. If I give the shuffle enough
memory it won't- irrespective of the number of partitions we have (i've
done everything from repartition(500) to repartition(2500)). It's not a
matter of running out of memory on a single node because the data is
skewed. It's more a matter of the shuffle buffer filling up and needing to
spill. I think what may be happening is that it gets to a point where it's
spending more time reading/writing from disk while doing the spills then it
is actually processing any data. I can tell this because I can see that the
spills sometimes get up into the 10's to 100's of TB where the input data
was maybe 100gb at most. Unfortunately my code is on a private internal
network and I'm not able to share it.

On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman  wrote:

> so can you provide code snippets: especially it's interesting to see what
> are your transformation chain, how many partitions are there on each side
> of shuffle operation
>
> the question is why it can't fit stuff in memory when you are shuffling -
> maybe your partitioner on "reduce" side is not configured properly? I mean
> if map side is ok, and you just reducing by key or something it should be
> ok, so some detail is missing...skewed data? aggregate by key?
>
> On 6 February 2016 at 20:13, Corey Nolet  wrote:
>
>> Igor,
>>
>> Thank you for the response but unfortunately, the problem I'm referring
>> to goes beyond this. I have set the shuffle memory fraction to be 90% and
>> set the cache memory to be 0. Repartitioning the RDD helped a tad on the
>> map side but didn't do much for the spilling when there was no longer any
>> memory left for the shuffle. Also the new auto-memory management doesn't
>> seem like it'll have too much of an effect after i've already given most
>> the memory i've allocated to the shuffle. The problem I'm having is most
>> specifically related to the shuffle performing declining by several orders
>> of magnitude when it needs to spill multiple times (it ends up spilling
>> several hundred for me when it can't fit stuff into memory).
>>
>>
>>
>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman 
>> wrote:
>>
>>> Hi,
>>> usually you can solve this by 2 steps
>>> make rdd to have more partitions
>>> play with shuffle memory fraction
>>>
>>> in spark 1.6 cache vs shuffle memory fractions are adjusted automatically
>>>
>>> On 5 February 2016 at 23:07, Corey Nolet  wrote:
>>>
 I just recently had a discovery that my jobs were taking several hours
 to completely because of excess shuffle spills. What I found was that when
 I hit the high point where I didn't have enough memory for the shuffles to
 store all of their file consolidations at once, it could spill so many
 times that it causes my job's runtime to increase by orders of magnitude
 (and sometimes fail altogether).

 I've played with all the tuning parameters I can find. To speed the
 shuffles up, I tuned the akka threads to different values. I also tuned the
 shuffle buffering a tad (both up and down).

 I feel like I see a weak point here. The mappers are sharing memory
 space with reducers and the shuffles need enough memory to consolidate and
 pull otherwise they will need to spill and spill and spill. What i've
 noticed about my jobs is that this is a difference between them taking 30
 minutes and 4 hours or more. Same job- just different memory tuning.

 I've found that, as a result of the spilling, I'm better off not
 caching any data in memory and lowering my storage fraction to 0 and still
 hoping I was able to give my shuffles enough memory that my data doesn't
 continuously spill. Is this the way it's supposed to be? It makes it hard
 because it seems like it forces the memory limits on my job- otherwise it
 could take orders of magnitude longer to execute.


>>>
>>
>


Re: Apache Spark data locality when integrating with Kafka

2016-02-07 Thread Yuval.Itzchakov
I would definitely try to avoid hosting Kafka and Spark on the same servers. 

Kafka and Spark will be doing alot of IO between them, so you'll want to
maximize on those resources and not share them on the same server. You'll
want each Kafka broker to be on a dedicated server, as well as your spark
master and workers. If you're hosting them on Amazon EC2 instances, then
you'll want these to be on the same availability zone, so you can benefit
from low latency in that same zone. If you're on a dedicated servers,
perhaps you'll want to create a VPC between the two clusters so you can,
again, benefit from low IO latency and high throughput.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165p26170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unexpected element type class

2016-02-07 Thread Anoop Shiralige
Hi All,

I have written some functions in scala, which I want to expose in pyspark
(interactively, spark - 1.6.0). 
The scala function(loadAvro) writtens a JavaRDD[AvroGenericRecord].
AvroGenericRecord is my wrapper class over the
/org.apache.avro.generic.GenericRecord/. I am trying to convert this JavaRDD
into pyspark RDD and from there on try and use the APIs available. But, I
get that "Unexpected element type class exception". what can i do to resolve
this ?! 

Thanks,
AnoopShiralige

*Below is the code :*


avroData = sc._jvm.dummy.commons.avro.AvroParser.loadAvro("", sc._jsc)
from pyspark.rdd import RDD
pythonRDD=RDD(avroData,sc)
pythonRDD.collect()


*loadAvro definition* : 
def loadAvro(path: String, sc: org.apache.spark.api.java.JavaSparkContext):
JavaRDD[(AvroGenericRecord)]

*Error I am getting : *

16/02/07 04:06:45 INFO mapreduce.AvroKeyInputFormat: Using a reader schema
equal to the writer schema.
16/02/07 04:06:45 ERROR python.PythonRunner: Python worker exited
unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File
"/disk2/spark6/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/worker.py",
line 98, in main
command = pickleSer._read_with_length(infile)
  File
"/disk2/spark6/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py",
line 156, in _read_with_length
length = read_int(stream)
  File
"/disk2/spark6/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py",
line 545, in read_int
raise EOFError
EOFError

at
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
at
org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.spark.SparkException: Unexpected element type class
dummy.commons.avro.AvroGenericRecord
at
org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:449)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/02/07 04:06:45 ERROR python.PythonRunner: This may have been caused by a
prior exception:
org.apache.spark.SparkException: Unexpected element type class
dummy.commons.avro.AvroGenericRecord
at
org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:449)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/02/07 04:06:45 ERROR executor.Executor: Exception in task 0.0 in stage
1.0 (TID 1)
org.apache.spark.SparkException: Unexpected element type class
dummy.commons.avro.AvroGenericRecord
at
org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:449)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
   

Re: Apache Spark data locality when integrating with Kafka

2016-02-07 Thread Diwakar Dhanuskodi
Fanoos, 
Where  you  want the solution to  be deployed ?. On premise or cloud?

Regards 
Diwakar .



Sent from Samsung Mobile.

 Original message From: "Yuval.Itzchakov" 
 Date:07/02/2016  19:38  (GMT+05:30) 
To: user@spark.apache.org Cc:  Subject: Re: 
Apache Spark data locality when integrating with Kafka 
I would definitely try to avoid hosting Kafka and Spark on the same 
servers. 

Kafka and Spark will be doing alot of IO between them, so you'll want to
maximize on those resources and not share them on the same server. You'll
want each Kafka broker to be on a dedicated server, as well as your spark
master and workers. If you're hosting them on Amazon EC2 instances, then
you'll want these to be on the same availability zone, so you can benefit
from low latency in that same zone. If you're on a dedicated servers,
perhaps you'll want to create a VPC between the two clusters so you can,
again, benefit from low IO latency and high throughput.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165p26170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



?????? Shuffle memory woes

2016-02-07 Thread Sea
Hi??Corey??
   "The dataset is 100gb at most, the spills can up to 10T-100T", Are your 
input files lzo format, and you use sc.text() ? If memory is not enough, spark 
will spill 3-4x of input data to disk.




--  --
??: "Corey Nolet";;
: 2016??2??7??(??) 8:56
??: "Igor Berman"; 
: "user"; 
: Re: Shuffle memory woes



As for the second part of your questions- we have a fairly complex join process 
which requires a ton of stage orchestration from our driver. I've written some 
code to be able to walk down our DAG tree and execute siblings in the tree 
concurrently where possible (forcing cache to disk on children that that have 
multiple chiildren themselves so that they can be run concurrently). Ultimatey, 
we have seen significant speedup in our jobs by keeping tasks as busy as 
possible processing concurrent stages. Funny enough though, the stage that is 
causing problems with shuffling for us has a lot of children and doesn't even 
run concurrently with any other stages so I ruled out the concurrency of the 
stages as a culprit for the shuffliing problem we're seeing.

On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet  wrote:
Igor,

I don't think the question is "why can't it fit stuff in memory". I know why it 
can't fit stuff in memory- because it's a large dataset that needs to have a 
reduceByKey() run on it. My understanding is that when it doesn't fit into 
memory it needs to spill in order to consolidate intermediary files into a 
single file. The more data you need to run through this, the more it will need 
to spill. My findings is that once it gets stuck in this spill chain with our 
dataset it's all over @ that point because it will spill and spill and spill 
and spill and spill. If I give the shuffle enough memory it won't- irrespective 
of the number of partitions we have (i've done everything from repartition(500) 
to repartition(2500)). It's not a matter of running out of memory on a single 
node because the data is skewed. It's more a matter of the shuffle buffer 
filling up and needing to spill. I think what may be happening is that it gets 
to a point where it's spending more time reading/writing from disk while doing 
the spills then it is actually processing any data. I can tell this because I 
can see that the spills sometimes get up into the 10's to 100's of TB where the 
input data was maybe acquireExecutionMemory at most. Unfortunately my code is 
on a private internal network and I'm not able to share it. 


On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman  wrote:
so can you provide code snippets: especially it's interesting to see what are 
your transformation chain, how many partitions are there on each side of 
shuffle operation

the question is why it can't fit stuff in memory when you are shuffling - maybe 
your partitioner on "reduce" side is not configured properly? I mean if map 
side is ok, and you just reducing by key or something it should be ok, so some 
detail is missing...skewed data? aggregate by key?


On 6 February 2016 at 20:13, Corey Nolet  wrote:
Igor,

Thank you for the response but unfortunately, the problem I'm referring to goes 
beyond this. I have set the shuffle memory fraction to be 90% and set the cache 
memory to be 0. Repartitioning the RDD helped a tad on the map side but didn't 
do much for the spilling when there was no longer any memory left for the 
shuffle. Also the new auto-memory management doesn't seem like it'll have too 
much of an effect after i've already given most the memory i've allocated to 
the shuffle. The problem I'm having is most specifically related to the shuffle 
performing declining by several orders of magnitude when it needs to spill 
multiple times (it ends up spilling several hundred for me when it can't fit 
stuff into memory).






On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman  wrote:
Hi,usually you can solve this by 2 steps
make rdd to have more partitions
play with shuffle memory fraction


in spark 1.6 cache vs shuffle memory fractions are adjusted automatically


On 5 February 2016 at 23:07, Corey Nolet  wrote:
I just recently had a discovery that my jobs were taking several hours to 
completely because of excess shuffle spills. What I found was that when I hit 
the high point where I didn't have enough memory for the shuffles to store all 
of their file consolidations at once, it could spill so many times that it 
causes my job's runtime to increase by orders of magnitude (and sometimes fail 
altogether).


I've played with all the tuning parameters I can find. To speed the shuffles 
up, I tuned the akka threads to different values. I also tuned the shuffle 
buffering a tad (both up and down). 


I feel like I see a weak point here. The mappers are sharing memory space with 
reducers 

Re: Apache Spark data locality when integrating with Kafka

2016-02-07 Thread أنس الليثي
Diwakar

We have our own servers. We will not use any cloud service like Amazon's

On 7 February 2016 at 18:24, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> Fanoos,
> Where  you  want the solution to  be deployed ?. On premise or cloud?
>
> Regards
> Diwakar .
>
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: "Yuval.Itzchakov" 
> Date:07/02/2016 19:38 (GMT+05:30)
> To: user@spark.apache.org
> Cc:
> Subject: Re: Apache Spark data locality when integrating with Kafka
>
> I would definitely try to avoid hosting Kafka and Spark on the same
> servers.
>
> Kafka and Spark will be doing alot of IO between them, so you'll want to
> maximize on those resources and not share them on the same server. You'll
> want each Kafka broker to be on a dedicated server, as well as your spark
> master and workers. If you're hosting them on Amazon EC2 instances, then
> you'll want these to be on the same availability zone, so you can benefit
> from low latency in that same zone. If you're on a dedicated servers,
> perhaps you'll want to create a VPC between the two clusters so you can,
> again, benefit from low IO latency and high throughput.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165p26170.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Anas Rabei
Senior Software Developer
Mubasher.info
anas.ra...@mubasher.info


Advise on using spark shell for Hive table sql queries

2016-02-07 Thread Mich Talebzadeh
Hi,

 

 

Pretty new to spark shell. 

 

So decided to write this piece of code to get the data from spark shell on
Hiver tables. The issue is that I don't really need to define the sqlContext
here as I can do a simple command like sql("select count(1) from t") WITHOUT
sqlContext. sql("select count(1) from t")

 

 

Also I found out that I can write multiple sql lines using three quotation
-->  code..."""

 

Are there other ways of writing multi line sql code?

 

The code works fine, However, any improvements will be welcome

 

val sqlContext  = new org.apache.spark.sql.hive.HiveContext(sc)

var sqltext : String = ""

sqltext = "use oraclehadoop"

sqlContext.sql(sqltext)

sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
HH:mm:ss.ss') AS StartTime").show()

println("\n Running the query \n")

sqltext = """

SELECT c.country_name AS nation,

   s.year,

   s.month,

   sum (amount_sold) AS salesamount

FROM  countries c

INNER JOIN customers cs

ON c.country_id = cs.country_id

INNER JOIN sales s

ON cs.cust_id = s.cust_id

GROUP BY country_name, s.year, s.month

ORDER BY country_name, s.year, s.month

"""

// sql(sqltext)show()

sqlContext.sql(sqltext).collect.foreach(println)

 

sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
HH:mm:ss.ss') AS EndTime").show()

sys.exit()

 

 

Regards,

 

Mich

 

  http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This
message is for the designated recipient only, if you are not the intended
recipient, you should destroy it immediately. Any information in this
message shall not be understood as given or endorsed by Peridale Technology
Ltd, its subsidiaries or their employees, unless expressly so stated. It is
the responsibility of the recipient to ensure that this email is virus free,
therefore neither Peridale Technology Ltd, its subsidiaries nor their
employees accept any responsibility.

 



Re: spark metrics question

2016-02-07 Thread Matt K
Thanks Takeshi, that's exactly what I was looking for.

On Fri, Feb 5, 2016 at 12:32 PM, Takeshi Yamamuro 
wrote:

> How about using `spark.jars` to send jars into a cluster?
>
> On Sat, Feb 6, 2016 at 12:00 AM, Matt K  wrote:
>
>> Yes. And what I'm trying to figure out if there's a way to package the
>> jar in such a way that I don't have to install it on every Executor node.
>>
>>
>> On Wed, Feb 3, 2016 at 7:46 PM, Yiannis Gkoufas 
>> wrote:
>>
>>> Hi Matt,
>>>
>>> does the custom class you want to package reports metrics of each
>>> Executor?
>>>
>>> Thanks
>>>
>>> On 3 February 2016 at 15:56, Matt K  wrote:
>>>
 Thanks for sharing Yiannis, looks very promising!

 Do you know if I can package a custom class with my application, or
 does it have to be pre-deployed on all Executor nodes?

 On Wed, Feb 3, 2016 at 10:36 AM, Yiannis Gkoufas 
 wrote:

> Hi Matt,
>
> there is some related work I recently did in IBM Research for
> visualizing the metrics produced.
> You can read about it here
> http://www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/
> We recently opensourced it if you are interested to have a deeper look
> to it: https://github.com/ibm-research-ireland/sparkoscope
>
> Thanks,
> Yiannis
>
> On 3 February 2016 at 13:32, Matt K  wrote:
>
>> Hi guys,
>>
>> I'm looking to create a custom sync based on Spark's Metrics System:
>>
>> https://github.com/apache/spark/blob/9f603fce78fcc997926e9a72dec44d48cbc396fc/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
>>
>> If I want to collect metrics from the Driver, Master, and Executor
>> nodes, should the jar with the custom class be installed on Driver, 
>> Master,
>> and Executor nodes?
>>
>> Also, on Executor nodes, does the MetricsSystem run inside the
>> Executor's JVM?
>>
>> Thanks,
>> -Matt
>>
>
>


 --
 www.calcmachine.com - easy online calculator.

>>>
>>>
>>
>>
>> --
>> www.calcmachine.com - easy online calculator.
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
www.calcmachine.com - easy online calculator.


Re: Bad Digest error while doing aws s3 put

2016-02-07 Thread Steve Loughran

> On 7 Feb 2016, at 07:57, Dhimant  wrote:
> 
>at
> com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadSinglePart(MultipartUploadOutputStream.java:245)
>... 15 more
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The
> Content-MD5 you specified did not match what we received. (Service: Amazon
> S3; Status Code: 400; Error Code: BadDigest; Request ID: 5918216A5901FCC8),
> S3 Extended Request ID:
> QSxtYln/yXqHYpdr4BWosin/TAFsGlK1FlKfE5PcuJkNrgoblGzTNt74kEhuNcrJCRZ3mXq0oUo=
>at
> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
>at
> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
>at
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3796)
>at
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1482)
>at
> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.storeFile(Jets3tNativeFileSystemStore.java:140)
>... 22 more
> 

This is amazon's own s3 client. nothing in the apache hadoop source tree. 
Normally I'd say "use s3a to make s3n problems go away", but I don't know what 
that does on Amazon's own EMR libraries

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Apache Spark data locality when integrating with Kafka

2016-02-07 Thread Diwakar Dhanuskodi
We   are using spark in  two  ways 
1. Yarn with spark support. Kafka running along with  data nodes 
2.  Spark master and workers  running  with  some  of  Kafka brokers. 
Data locality is  important.

Regards
Diwakar 


Sent from Samsung Mobile.

 Original message From: أنس الليثي 
 Date:08/02/2016  02:07  (GMT+05:30) 
To: Diwakar Dhanuskodi  Cc: 
"Yuval.Itzchakov" , user  
Subject: Re: Apache Spark data locality when integrating with Kafka 

Diwakar 

We have our own servers. We will not use any cloud service like Amazon's 

On 7 February 2016 at 18:24, Diwakar Dhanuskodi  
wrote:
Fanoos, 
Where  you  want the solution to  be deployed ?. On premise or cloud?

Regards 
Diwakar .



Sent from Samsung Mobile.


 Original message 
From: "Yuval.Itzchakov" 
Date:07/02/2016 19:38 (GMT+05:30)
To: user@spark.apache.org
Cc:
Subject: Re: Apache Spark data locality when integrating with Kafka

I would definitely try to avoid hosting Kafka and Spark on the same servers. 

Kafka and Spark will be doing alot of IO between them, so you'll want to
maximize on those resources and not share them on the same server. You'll
want each Kafka broker to be on a dedicated server, as well as your spark
master and workers. If you're hosting them on Amazon EC2 instances, then
you'll want these to be on the same availability zone, so you can benefit
from low latency in that same zone. If you're on a dedicated servers,
perhaps you'll want to create a VPC between the two clusters so you can,
again, benefit from low IO latency and high throughput.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165p26170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-- 
Anas Rabei
Senior Software Developer
Mubasher.info
anas.ra...@mubasher.info

Re: Shuffle memory woes

2016-02-07 Thread Corey Nolet
Charles,

Thank you for chiming in and I'm glad someone else is experiencing this too
and not just me. I know very well how the Spark shuffles work and I've done
deep dive presentations @ Spark meetups in the past. This problem is
somethng that goes beyond that and, I believe, it exposes a fundamental
paradigm flaw in the design of Spark, unfortunately. Good thing is, I think
it can be fixed.

Also- in regards to how much data actually gets shuffled- believe it or not
this problem can take a 30-40 minute job and make it run for 4 or more
hours. If  let the job run for 4+ hours the amount of data being shuffled
for this particular dataset will be 100 or more TB. Usually, however, I end
up killing the job long before that point because I realize it should not
be taking this long. The particular dataset we're doing is not for
real-time exploration. These are very large joins we're doing for jobs that
we run a few times a day.

On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao  wrote:

>  "The dataset is 100gb at most, the spills can up to 10T-100T"
>
> -- I have had the same experiences, although not to this extreme (the
> spills were < 10T while the input was ~ 100s gb) and haven't found any
> solution yet. I don't believe this is related to input data format. in my
> case, I got my input data by loading from Hive tables.
>
> On Sun, Feb 7, 2016 at 6:28 AM, Sea <261810...@qq.com> wrote:
>
>> Hi,Corey:
>>"The dataset is 100gb at most, the spills can up to 10T-100T", Are
>> your input files lzo format, and you use sc.text() ? If memory is not
>> enough, spark will spill 3-4x of input data to disk.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Corey Nolet";;
>> *发送时间:* 2016年2月7日(星期天) 晚上8:56
>> *收件人:* "Igor Berman";
>> *抄送:* "user";
>> *主题:* Re: Shuffle memory woes
>>
>> As for the second part of your questions- we have a fairly complex join
>> process which requires a ton of stage orchestration from our driver. I've
>> written some code to be able to walk down our DAG tree and execute siblings
>> in the tree concurrently where possible (forcing cache to disk on children
>> that that have multiple chiildren themselves so that they can be run
>> concurrently). Ultimatey, we have seen significant speedup in our jobs by
>> keeping tasks as busy as possible processing concurrent stages. Funny
>> enough though, the stage that is causing problems with shuffling for us has
>> a lot of children and doesn't even run concurrently with any other stages
>> so I ruled out the concurrency of the stages as a culprit for the
>> shuffliing problem we're seeing.
>>
>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet  wrote:
>>
>>> Igor,
>>>
>>> I don't think the question is "why can't it fit stuff in memory". I know
>>> why it can't fit stuff in memory- because it's a large dataset that needs
>>> to have a reduceByKey() run on it. My understanding is that when it doesn't
>>> fit into memory it needs to spill in order to consolidate intermediary
>>> files into a single file. The more data you need to run through this, the
>>> more it will need to spill. My findings is that once it gets stuck in this
>>> spill chain with our dataset it's all over @ that point because it will
>>> spill and spill and spill and spill and spill. If I give the shuffle enough
>>> memory it won't- irrespective of the number of partitions we have (i've
>>> done everything from repartition(500) to repartition(2500)). It's not a
>>> matter of running out of memory on a single node because the data is
>>> skewed. It's more a matter of the shuffle buffer filling up and needing to
>>> spill. I think what may be happening is that it gets to a point where it's
>>> spending more time reading/writing from disk while doing the spills then it
>>> is actually processing any data. I can tell this because I can see that the
>>> spills sometimes get up into the 10's to 100's of TB where the input data
>>> was maybe acquireExecutionMemory at most. Unfortunately my code is on a
>>> private internal network and I'm not able to share it.
>>>
>>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman 
>>> wrote:
>>>
 so can you provide code snippets: especially it's interesting to see
 what are your transformation chain, how many partitions are there on each
 side of shuffle operation

 the question is why it can't fit stuff in memory when you are shuffling
 - maybe your partitioner on "reduce" side is not configured properly? I
 mean if map side is ok, and you just reducing by key or something it should
 be ok, so some detail is missing...skewed data? aggregate by key?

 On 6 February 2016 at 20:13, Corey Nolet  wrote:

> Igor,
>
> Thank you for the response but unfortunately, the problem I'm
> referring to goes beyond this. I have set the shuffle memory fraction to 

Re: Spark Streaming with Druid?

2016-02-07 Thread Hemant Bhanawat
You may want to have a look at spark druid project already in progress:
https://github.com/SparklineData/spark-druid-olap

You can also have a look at SnappyData
, which is a low latency store
tightly integrated with Spark, Spark SQL and Spark Streaming. You can find
the 0.1 Preview release's documentation here.


Disclaimer: I am a SnappyData engineer.

Hemant
www.snappydata.io


On Sun, Feb 7, 2016 at 12:47 AM, unk1102  wrote:

> Hi did anybody tried Spark Streaming with Druid as low latency store?
> Combination seems powerful is it worth trying both together? Please guide
> and share your experience. I am after creating the best low latency
> streaming analytics.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Druid-tp26164.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Shuffle memory woes

2016-02-07 Thread Charles Chao
 "The dataset is 100gb at most, the spills can up to 10T-100T"

-- I have had the same experiences, although not to this extreme (the
spills were < 10T while the input was ~ 100s gb) and haven't found any
solution yet. I don't believe this is related to input data format. in my
case, I got my input data by loading from Hive tables.

On Sun, Feb 7, 2016 at 6:28 AM, Sea <261810...@qq.com> wrote:

> Hi,Corey:
>"The dataset is 100gb at most, the spills can up to 10T-100T", Are your
> input files lzo format, and you use sc.text() ? If memory is not enough,
> spark will spill 3-4x of input data to disk.
>
>
> -- 原始邮件 --
> *发件人:* "Corey Nolet";;
> *发送时间:* 2016年2月7日(星期天) 晚上8:56
> *收件人:* "Igor Berman";
> *抄送:* "user";
> *主题:* Re: Shuffle memory woes
>
> As for the second part of your questions- we have a fairly complex join
> process which requires a ton of stage orchestration from our driver. I've
> written some code to be able to walk down our DAG tree and execute siblings
> in the tree concurrently where possible (forcing cache to disk on children
> that that have multiple chiildren themselves so that they can be run
> concurrently). Ultimatey, we have seen significant speedup in our jobs by
> keeping tasks as busy as possible processing concurrent stages. Funny
> enough though, the stage that is causing problems with shuffling for us has
> a lot of children and doesn't even run concurrently with any other stages
> so I ruled out the concurrency of the stages as a culprit for the
> shuffliing problem we're seeing.
>
> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet  wrote:
>
>> Igor,
>>
>> I don't think the question is "why can't it fit stuff in memory". I know
>> why it can't fit stuff in memory- because it's a large dataset that needs
>> to have a reduceByKey() run on it. My understanding is that when it doesn't
>> fit into memory it needs to spill in order to consolidate intermediary
>> files into a single file. The more data you need to run through this, the
>> more it will need to spill. My findings is that once it gets stuck in this
>> spill chain with our dataset it's all over @ that point because it will
>> spill and spill and spill and spill and spill. If I give the shuffle enough
>> memory it won't- irrespective of the number of partitions we have (i've
>> done everything from repartition(500) to repartition(2500)). It's not a
>> matter of running out of memory on a single node because the data is
>> skewed. It's more a matter of the shuffle buffer filling up and needing to
>> spill. I think what may be happening is that it gets to a point where it's
>> spending more time reading/writing from disk while doing the spills then it
>> is actually processing any data. I can tell this because I can see that the
>> spills sometimes get up into the 10's to 100's of TB where the input data
>> was maybe acquireExecutionMemory at most. Unfortunately my code is on a
>> private internal network and I'm not able to share it.
>>
>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman 
>> wrote:
>>
>>> so can you provide code snippets: especially it's interesting to see
>>> what are your transformation chain, how many partitions are there on each
>>> side of shuffle operation
>>>
>>> the question is why it can't fit stuff in memory when you are shuffling
>>> - maybe your partitioner on "reduce" side is not configured properly? I
>>> mean if map side is ok, and you just reducing by key or something it should
>>> be ok, so some detail is missing...skewed data? aggregate by key?
>>>
>>> On 6 February 2016 at 20:13, Corey Nolet  wrote:
>>>
 Igor,

 Thank you for the response but unfortunately, the problem I'm referring
 to goes beyond this. I have set the shuffle memory fraction to be 90% and
 set the cache memory to be 0. Repartitioning the RDD helped a tad on the
 map side but didn't do much for the spilling when there was no longer any
 memory left for the shuffle. Also the new auto-memory management doesn't
 seem like it'll have too much of an effect after i've already given most
 the memory i've allocated to the shuffle. The problem I'm having is most
 specifically related to the shuffle performing declining by several orders
 of magnitude when it needs to spill multiple times (it ends up spilling
 several hundred for me when it can't fit stuff into memory).



 On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman 
 wrote:

> Hi,
> usually you can solve this by 2 steps
> make rdd to have more partitions
> play with shuffle memory fraction
>
> in spark 1.6 cache vs shuffle memory fractions are adjusted
> automatically
>
> On 5 February 2016 at 23:07, Corey Nolet  wrote:
>
>> I just recently had a discovery that my jobs were 

Re: Imported CSV file content isn't identical to the original file

2016-02-07 Thread Luciano Resende
I tried in both 1.5.0, 1.6.0 and 2.0.0 trunk and
com.databricks:spark-csv_2.10:1.3.0 with expected results, where the
columns seem to be read properly.

 +--+--+
|C0|C1|
+--+--+
|1446566430 | 2015-11-0400:00:30|
|1446566430 | 2015-11-0400:00:30|
|1446566430 | 2015-11-0400:00:30|
|1446566430 | 2015-11-0400:00:30|
|1446566430 | 2015-11-0400:00:30|
|1446566431 | 2015-11-0400:00:31|
|1446566431 | 2015-11-0400:00:31|
|1446566431 | 2015-11-0400:00:31|
|1446566431 | 2015-11-0400:00:31|
|1446566431 | 2015-11-0400:00:31|
+--+--+




On Sat, Feb 6, 2016 at 11:44 PM, SLiZn Liu  wrote:

> Hi Spark Users Group,
>
> I have a csv file to analysis with Spark, but I’m troubling with importing
> as DataFrame.
>
> Here’s the minimal reproducible example. Suppose I’m having a
> *10(rows)x2(cols)* *space-delimited csv* file, shown as below:
>
> 1446566430 2015-11-0400:00:30
> 1446566430 2015-11-0400:00:30
> 1446566430 2015-11-0400:00:30
> 1446566430 2015-11-0400:00:30
> 1446566430 2015-11-0400:00:30
> 1446566431 2015-11-0400:00:31
> 1446566431 2015-11-0400:00:31
> 1446566431 2015-11-0400:00:31
> 1446566431 2015-11-0400:00:31
> 1446566431 2015-11-0400:00:31
>
> the  in column 2 represents sub-delimiter within that column, and
> this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv
>
> I’m using *spark-csv* to import this file as Spark *DataFrame*:
>
> sqlContext.read.format("com.databricks.spark.csv")
> .option("header", "false") // Use first line of all files as header
> .option("inferSchema", "false") // Automatically infer data types
> .option("delimiter", " ")
> .load("hdfs:///tmp/1.csv")
> .show
>
> Oddly, the output shows only a part of each column:
>
> [image: Screenshot from 2016-02-07 15-27-51.png]
>
> and even the boundary of the table wasn’t shown correctly. I also used the
> other way to read csv file, by sc.textFile(...).map(_.split(" ")) and
> sqlContext.createDataFrame, and the result is the same. Can someone point
> me out where I did it wrong?
>
> —
> BR,
> Todd Leo
> ​
>



-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: Handling Hive Table With large number of rows

2016-02-07 Thread Jörn Franke
Can you provide more details? Your use case does not sound you need Spark.
Your version is anyway too old. It does not make sense to develop now with 
1.2.1 . There is no "project limitation" that is able to justify this. 

> On 08 Feb 2016, at 06:48, Meetu Maltiar  wrote:
> 
> Hi,
> 
> I am working on an application that reads a single Hive Table and do some 
> manipulations on each row of it. Finally construct an XML.
> Hive table will be a large data set, no chance to fit it in memory. I intend 
> to use SparkSQL 1.2.1 (due to project limitations).
> Any pointers to me on handling this large data-set will be helpful (Fetch 
> Size….).
> 
> Thanks in advance.
> 
> Kind Regards,
> Meetu Maltiar
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Handling Hive Table With large number of rows

2016-02-07 Thread Meetu Maltiar
Thanks Jörn,

We have to construct an XML on HDFS location from couple of Hive tables and
they join on one key.
The data in both tables we have to join is large. Was wondering for the
right approach.
XML creation will also be tricky as we cannot hold objects in memory.
Old Spark 1.2.1 is a bummer, sure no one can justify.





On Mon, Feb 8, 2016 at 11:53 AM, Jörn Franke  wrote:

> Can you provide more details? Your use case does not sound you need Spark.
> Your version is anyway too old. It does not make sense to develop now with
> 1.2.1 . There is no "project limitation" that is able to justify this.
>
> > On 08 Feb 2016, at 06:48, Meetu Maltiar  wrote:
> >
> > Hi,
> >
> > I am working on an application that reads a single Hive Table and do
> some manipulations on each row of it. Finally construct an XML.
> > Hive table will be a large data set, no chance to fit it in memory. I
> intend to use SparkSQL 1.2.1 (due to project limitations).
> > Any pointers to me on handling this large data-set will be helpful
> (Fetch Size….).
> >
> > Thanks in advance.
> >
> > Kind Regards,
> > Meetu Maltiar
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>



-- 
Meetu Maltiar,
Mobile-09717005168


Handling Hive Table With large number of rows

2016-02-07 Thread Meetu Maltiar
Hi,

I am working on an application that reads a single Hive Table and do some 
manipulations on each row of it. Finally construct an XML.
Hive table will be a large data set, no chance to fit it in memory. I intend to 
use SparkSQL 1.2.1 (due to project limitations).
Any pointers to me on handling this large data-set will be helpful (Fetch 
Size….).

Thanks in advance.

Kind Regards,
Meetu Maltiar
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org