Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-02 Thread Neil Jonkers
Hi,

Can you set the following parameters in your mapred-site.xml file please:

mapred.output.direct.EmrFileSystemtrue
mapred.output.direct.NativeS3FileSystemtrue

You can also config this at cluster launch time with the following
Classification via EMR console:

classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true]


Thank you

On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov 
wrote:

> I checked previous emr config (emr-3.8)
> mapred-site.xml has the following setting
> 
> mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter
> 
>
>
> On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov 
> wrote:
>
>> Should I use DirectOutputCommitter?
>> spark.hadoop.mapred.output.committer.class
>>  com.appsflyer.spark.DirectOutputCommitter
>>
>>
>>
>> On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov > > wrote:
>>
>>> I run spark 1.4.1 in amazom aws emr 4.0.0
>>>
>>> For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
>>> comparison to emr 3.8  (was 5 sec, now 95 sec)
>>>
>>> Actually saveAsTextFile says that it's done in 4.356 sec but after that
>>> I see lots of INFO messages with 404 error from com.amazonaws.latency
>>> logger for next 90 sec
>>>
>>> spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
>>> "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")
>>>
>>> 2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
>>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
>>> (saveAsTextFile at :22) finished in 4.356 s
>>> 2015-09-01 21:16:17,637 INFO  [task-result-getter-2]
>>> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0,
>>> whose tasks have all completed, from pool
>>> 2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
>>> (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
>>> :22, took 4.547829 s
>>> 2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
>>> (S3NativeFileSystem.java:listStatus(896)) - listStatus
>>> s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
>>> 2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>>> ID: 3B2F06FD11682D22), S3 Extended Request ID:
>>> C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
>>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>>> AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
>>> https://foo-bar.s3.amazonaws.com], Exception=1,
>>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
>>> HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
>>> RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
>>> 2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
>>> ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[
>>> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
>>> RequestCount=1, HttpClientPoolPendingCount=0,
>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927],
>>> HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81],
>>> RequestSigningTime=[0.209], ResponseProcessingTime=[17.97],
>>> HttpClientSendRequestTime=[0.089],
>>> 2015-09-01 21:16:17,756 INFO  [main] amazonaws.latency
>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
>>> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
>>> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
>>> ID: 62C6B413965447FD), S3 Extended Request ID:
>>> 4w5rKMWCt9EdeEKzKBXZgWpTcBZCfDikzuRrRrBxmtHYxkZyS4GxQVyADdLkgtZf],
>>> ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
>>> AWSRequestID=[62C6B413965447FD], ServiceEndpoint=[
>>> https://foo-bar.s3.amazonaws.com], Exception=1,
>>> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.044],
>>> HttpRequestTime=[10.543], HttpClientReceiveResponseTime=[8.743],
>>> RequestSigningTime=[0.271], HttpClientSendRequestTime=[0.138],
>>> 2015-09-01 21:16:17,774 INFO  [main] amazonaws.latency
>>> (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
>>> ServiceName=[Amazon S3], AWSRequestID=[F62B991825042889], ServiceEndpoint=[
>>> https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
>>> RequestCount=1, HttpClientPoolPendingCount=0,
>>> HttpClientPoolAvailableCount=1, ClientExecuteTime=[16.724],
>>> HttpRequestTime=[16.292], HttpClientReceiveResponseTime=[14.728],
>>> RequestSigningTime=[0.148], ResponseProcessingTime=[0.155],
>>> 

Re: Too many open files issue

2015-09-02 Thread Steve Loughran

On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
> wrote:

I know I can adjust the max open files allowed by the OS but I'd rather fix the 
underlaying issue.


bumping up the OS handle limits is step #1 of installing a hadoop cluster

https://wiki.apache.org/hadoop/TooManyOpenFiles


Re: Custom Partitioner

2015-09-02 Thread Jem Tucker
alter the range partitioner such that it skews the partitioning and assigns
more partitions to the heavier weighted keys? to do this you will have to
know the weighting before you start

On Wed, Sep 2, 2015 at 8:02 AM shahid ashraf  wrote:

> yes i can take as an example , but my actual use case is that in need to
> resolve a data skew, when i do grouping based on key(A-Z) the resulting
> partitions are skewed like
> (partition no.,no_of_keys, total elements with given key)
> << partition: [(0, 0, 0), (1, 15, 17395), (2, 0, 0), (3, 0, 0), (4, 13,
> 18196), (5, 0, 0), (6, 0, 0), (7, 0, 0), (8, 1, 1), (9, 0, 0)] and
> elements: >>
> the data has been skewed to partition 1 and 4, i need to split the
> partition. and do processing on split partitions and i should be able to
> combine splitted partition back also.
>
> On Tue, Sep 1, 2015 at 10:42 PM, Davies Liu  wrote:
>
>> You can take the sortByKey as example:
>> https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L642
>>
>> On Tue, Sep 1, 2015 at 3:48 AM, Jem Tucker  wrote:
>> > something like...
>> >
>> > class RangePartitioner(Partitioner):
>> > def __init__(self, numParts):
>> > self.numPartitions = numParts
>> > self.partitionFunction = rangePartition
>> > def rangePartition(key):
>> > # Logic to turn key into a partition id
>> > return id
>> >
>> > On Tue, Sep 1, 2015 at 11:38 AM shahid ashraf 
>> wrote:
>> >>
>> >> Hi
>> >>
>> >> I think range partitioner is not available in pyspark, so if we want
>> >> create one. how should we create that. my question is that.
>> >>
>> >> On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker 
>> wrote:
>> >>>
>> >>> Ah sorry I miss read your question. In pyspark it looks like you just
>> >>> need to instantiate the Partitioner class with numPartitions and
>> >>> partitionFunc.
>> >>>
>> >>> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf 
>> wrote:
>> 
>>  Hi
>> 
>>  I did not get this, e.g if i need to create a custom partitioner like
>>  range partitioner.
>> 
>>  On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker 
>> wrote:
>> >
>> > Hi,
>> >
>> > You just need to extend Partitioner and override the numPartitions
>> and
>> > getPartition methods, see below
>> >
>> > class MyPartitioner extends partitioner {
>> >   def numPartitions: Int = // Return the number of partitions
>> >   def getPartition(key Any): Int = // Return the partition for a
>> given
>> > key
>> > }
>> >
>> > On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <
>> shahidashr...@icloud.com>
>> > wrote:
>> >>
>> >> Hi Sparkians
>> >>
>> >> How can we create a customer partition in pyspark
>> >>
>> >>
>> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> 
>> 
>> 
>>  --
>>  with Regards
>>  Shahid Ashraf
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> with Regards
>> >> Shahid Ashraf
>>
>
>
>
> --
> with Regards
> Shahid Ashraf
>


Save dataframe into hbase

2015-09-02 Thread Hafiz Mujadid
Hi 

What is the efficient way to save Dataframe into hbase?

Thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Save-dataframe-into-hbase-tp24552.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



OOM in spark driver

2015-09-02 Thread ankit tyagi
Hi All,

I am using spark-sql 1.3.1 with hadoop 2.4.0 version.  I am running sql
query against parquet files and wanted to save result on s3 but looks like
https://issues.apache.org/jira/browse/SPARK-2984 problem still coming while
saving data to s3.

Hence Now i am saving result on hdfs and with the help
of JavaSparkListener, copying file from hdfs to s3 with hadoop fileUtil
in onApplicationEnd method. But  my job is getting failed with OOM in spark
driver.

*5/09/02 04:17:57 INFO cluster.YarnClusterSchedulerBackend: Asking each
executor to shut down*
*15/09/02 04:17:59 INFO
scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor:
OutputCommitCoordinator stopped!*
*Exception in thread "Reporter" *
*Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "Reporter"*
*Exception in thread "SparkListenerBus" *
*Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "SparkListenerBus"*
*Exception in thread "Driver" *
*Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "Driver"*


Strage part is, result is getting saved on HDFS but while copying file job
is getting failed. size of file is under 1MB.

Any help or leads would be appreciated.


R: Spark + Druid

2015-09-02 Thread Paolo Platter
Fantastic!!! I will look into that and I hope to contribute

Paolo

Inviata dal mio Windows Phone

Da: Harish Butani
Inviato: ‎02/‎09/‎2015 06:04
A: user
Oggetto: Spark + Druid

Hi,

I am working on the Spark Druid Package: 
https://github.com/SparklineData/spark-druid-olap.
For scenarios where a 'raw event' dataset is being indexed in Druid it enables 
you to write your Logical Plans(queries/dataflows) against the 'raw event' 
dataset and it rewrites parts of the plan to execute as a Druid Query. In Spark 
the configuration of a Druid DataSource is somewhat like configuring an OLAP 
index in a traditional DB. Early results show significant speedup of pushing 
slice and dice queries to Druid.

It comprises of a Druid DataSource that wraps the 'raw event' dataset and has 
knowledge of the Druid Index; and a DruidPlanner which is a set of plan rewrite 
strategies to convert Aggregation queries into a Plan having a DruidRDD.

Here
 is a detailed design document, which also describes a benchmark of 
representative queries on the TPCH dataset.

Looking for folks who would be willing to try this out and/or contribute.

regards,
Harish Butani.


Re: Custom Partitioner

2015-09-02 Thread shahid ashraf
yes i can take as an example , but my actual use case is that in need to
resolve a data skew, when i do grouping based on key(A-Z) the resulting
partitions are skewed like
(partition no.,no_of_keys, total elements with given key)
<< partition: [(0, 0, 0), (1, 15, 17395), (2, 0, 0), (3, 0, 0), (4, 13,
18196), (5, 0, 0), (6, 0, 0), (7, 0, 0), (8, 1, 1), (9, 0, 0)] and
elements: >>
the data has been skewed to partition 1 and 4, i need to split the
partition. and do processing on split partitions and i should be able to
combine splitted partition back also.

On Tue, Sep 1, 2015 at 10:42 PM, Davies Liu  wrote:

> You can take the sortByKey as example:
> https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L642
>
> On Tue, Sep 1, 2015 at 3:48 AM, Jem Tucker  wrote:
> > something like...
> >
> > class RangePartitioner(Partitioner):
> > def __init__(self, numParts):
> > self.numPartitions = numParts
> > self.partitionFunction = rangePartition
> > def rangePartition(key):
> > # Logic to turn key into a partition id
> > return id
> >
> > On Tue, Sep 1, 2015 at 11:38 AM shahid ashraf  wrote:
> >>
> >> Hi
> >>
> >> I think range partitioner is not available in pyspark, so if we want
> >> create one. how should we create that. my question is that.
> >>
> >> On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker 
> wrote:
> >>>
> >>> Ah sorry I miss read your question. In pyspark it looks like you just
> >>> need to instantiate the Partitioner class with numPartitions and
> >>> partitionFunc.
> >>>
> >>> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf 
> wrote:
> 
>  Hi
> 
>  I did not get this, e.g if i need to create a custom partitioner like
>  range partitioner.
> 
>  On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker 
> wrote:
> >
> > Hi,
> >
> > You just need to extend Partitioner and override the numPartitions
> and
> > getPartition methods, see below
> >
> > class MyPartitioner extends partitioner {
> >   def numPartitions: Int = // Return the number of partitions
> >   def getPartition(key Any): Int = // Return the partition for a
> given
> > key
> > }
> >
> > On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <
> shahidashr...@icloud.com>
> > wrote:
> >>
> >> Hi Sparkians
> >>
> >> How can we create a customer partition in pyspark
> >>
> >>
> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> 
> 
> 
>  --
>  with Regards
>  Shahid Ashraf
> >>
> >>
> >>
> >>
> >> --
> >> with Regards
> >> Shahid Ashraf
>



-- 
with Regards
Shahid Ashraf


Simple join of two Spark DataFrame failing with “org.apache.spark.sql.AnalysisException: Cannot resolve column name”

2015-09-02 Thread steve.felsheim
Running into an issue trying to perform a simple join of two DataFrames
created from two different parquet files on HDFS.

[main] INFO org.apache.spark.SparkContext - Running *Spark version 1.4.1*

Using HDFS from Hadoop 2.7.0



Here is a sample to illustrate.

public void testStrangeness(String[] args) {
SparkConf conf = new
SparkConf().setMaster("local[*]").setAppName("joinIssue");
JavaSparkContext context = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(context);

DataFrame people =
sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/people.parquet");
DataFrame address =
sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/address.parquet");

people.printSchema();
address.printSchema();

// yeah, works
DataFrame cartJoin = address.join(people);
cartJoin.printSchema();

// boo, fails 
DataFrame joined = address.join(people,
address.col("addrid").equalTo(people.col("addressid")));

joined.printSchema();
}




Contents of people

first,last,addressid 
your,mom,1 
fred,flintstone,2


Contents of address

addrid,city,state,zip
1,sometown,wi,
2,bedrock,il,



people.printSchema(); 
results in...

root
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- addressid: integer (nullable = true)



address.printSchema();
results in...

root
 |-- addrid: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)



DataFrame cartJoin = address.join(people);
cartJoin.printSchema();

Cartesian join works fine, printSchema() results in...

root
 |-- addrid: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- addressid: integer (nullable = true)



This join...

DataFrame joined = address.join(people,
address.col("addrid").equalTo(people.col("addressid")));
Results in the following exception.


Exception in thread "main" org.apache.spark.sql.AnalysisException: *Cannot
resolve column name "addrid" among (addrid, city, state, zip); at
*org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
at
org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
at scala.Option.getOrElse(Option.scala:121) at
org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:158) at
org.apache.spark.sql.DataFrame.col(DataFrame.scala:558) at
dw.dataflow.DataflowParser.testStrangeness(DataflowParser.java:36) at
dw.dataflow.DataflowParser.main(DataflowParser.java:119)



I tried changing it so people and address have a common key attribute
(addressid) and used..

address.join(people, "addressid");
But got the same result.

Any ideas??

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Simple-join-of-two-Spark-DataFrame-failing-with-org-apache-spark-sql-AnalysisException-Cannot-resolv-tp24557.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Small File to HDFS

2015-09-02 Thread Tao Lu
You may consider storing it in one big HDFS file, and to keep appending new
messages to it.

For instance,
one message  -> zip it -> append it to the HDFS as one line

On Wed, Sep 2, 2015 at 12:43 PM,  wrote:

> Hi,
> I already store them in MongoDB in parralel for operational access and
> don't want to add an other database in the loop
> Is it the only solution ?
>
> Tks
> Nicolas
>
> - Mail original -
> De: "Ted Yu" 
> À: nib...@free.fr
> Cc: "user" 
> Envoyé: Mercredi 2 Septembre 2015 18:34:17
> Objet: Re: Small File to HDFS
>
>
> Instead of storing those messages in HDFS, have you considered storing
> them in key-value store (e.g. hbase) ?
>
>
> Cheers
>
>
> On Wed, Sep 2, 2015 at 9:07 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I'am currently using Spark Streaming to collect small messages (events) ,
> size being <50 KB , volume is high (several millions per day) and I have to
> store those messages in HDFS.
> I understood that storing small files can be problematic in HDFS , how can
> I manage it ?
>
> Tks
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Thanks!
Tao


Small File to HDFS

2015-09-02 Thread nibiau
Hello,
I'am currently using Spark Streaming to collect small messages (events) , size 
being <50 KB , volume is high (several millions per day) and I have to store 
those messages in HDFS.
I understood that storing small files can be problematic in HDFS , how can I 
manage it ?

Tks
Nicolas

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Small File to HDFS

2015-09-02 Thread nibiau
Hi,
I already store them in MongoDB in parralel for operational access and don't 
want to add an other database in the loop
Is it the only solution ?

Tks
Nicolas

- Mail original -
De: "Ted Yu" 
À: nib...@free.fr
Cc: "user" 
Envoyé: Mercredi 2 Septembre 2015 18:34:17
Objet: Re: Small File to HDFS


Instead of storing those messages in HDFS, have you considered storing them in 
key-value store (e.g. hbase) ? 


Cheers 


On Wed, Sep 2, 2015 at 9:07 AM, < nib...@free.fr > wrote: 


Hello, 
I'am currently using Spark Streaming to collect small messages (events) , size 
being <50 KB , volume is high (several millions per day) and I have to store 
those messages in HDFS. 
I understood that storing small files can be problematic in HDFS , how can I 
manage it ? 

Tks 
Nicolas 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Small File to HDFS

2015-09-02 Thread Ted Yu
Instead of storing those messages in HDFS, have you considered storing them
in key-value store (e.g. hbase) ?

Cheers

On Wed, Sep 2, 2015 at 9:07 AM,  wrote:

> Hello,
> I'am currently using Spark Streaming to collect small messages (events) ,
> size being <50 KB , volume is high (several millions per day) and I have to
> store those messages in HDFS.
> I understood that storing small files can be problematic in HDFS , how can
> I manage it ?
>
> Tks
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Memory-efficient successive calls to repartition()

2015-09-02 Thread alexis GILLAIN
Just made some tests on my laptop.

Deletion of the files is not immediate but a System.gc() call makes the job
on shuffle files of a checkpointed RDD.
It should solve your problem (`sc._jvm.System.gc()` in Python as pointed in
the databricks link in my previous message).


2015-09-02 20:55 GMT+08:00 Aurélien Bellet <
aurelien.bel...@telecom-paristech.fr>:

> Thanks a lot for the useful link and comments Alexis!
>
> First of all, the problem occurs without doing anything else in the code
> (except of course loading my data from HDFS at the beginning) - so it
> definitely comes from the shuffling. You're right, in the current version,
> checkpoint files are not removed and take up some space in HDFS (this is
> easy to fix). But this is negligible compared to the non hdfs files which
> keeps growing as iterations go. So I agree with you that this must come
> from the shuffling operations: it seems that the shuffle files are not
> removed along the execution (they are only removed if I stop/kill the
> application), despite the use of checkpoint.
>
> The class you mentioned is very interesting but I did not find a way to
> use it from pyspark. I will try to implement my own version, looking at the
> source code. But besides the queueing and removing of checkpoint files, I
> do not really see anything special there that could solve my issue.
>
> I will continue to investigate this. Just found out I can use a command
> line browser to look at the webui (I cannot access the server in graphical
> display mode), this should help me understand what's going on. I will also
> try the workarounds mentioned in the link. Keep you posted.
>
> Again, thanks a lot!
>
> Best,
>
> Aurelien
>
>
> Le 02/09/2015 14:15, alexis GILLAIN a écrit :
>
>> Aurélien,
>>
>>  From what you're saying, I can think of a couple of things considering
>> I don't know what you are doing in the rest of the code :
>>
>> - There is lot of non hdfs writes, it comes from the rest of your code
>> and/or repartittion(). Repartition involve a shuffling and creation of
>> files on disk. I would have said that the problem come from that but I
>> just checked and checkpoint() is supposed to delete shuffle files :
>>
>> https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
>> (looks exactly as your problem so you could maybe try the others
>> workarounds)
>> Still, you may do a lot of shuffle in the rest of the code (you should
>> see the amount of shuffle files written in the webui) and consider
>> increasing the disk space available...if you can do that.
>>
>> - On the hdfs side, the class I pointed to has an update function which
>> "automatically handles persisting and (optionally) checkpointing, as
>> well as unpersisting and removing checkpoint files". Not sure your
>> method for checkpointing remove previous checkpoint file.
>>
>> In the end, does the disk space error come from hdfs growing or local
>> disk growing ?
>>
>> You should check the webui to identify which tasks spill data on disk
>> and verify if the shuffle files are properly deleted when you checkpoint
>> your rdd.
>>
>>
>> Regards,
>>
>>
>> 2015-09-01 22:48 GMT+08:00 Aurélien Bellet
>> > >:
>>
>>
>> Dear Alexis,
>>
>> Thanks again for your reply. After reading about checkpointing I
>> have modified my sample code as follows:
>>
>> for i in range(1000):
>>  print i
>>  data2=data.repartition(50).cache()
>>  if (i+1) % 10 == 0:
>>  data2.checkpoint()
>>  data2.first() # materialize rdd
>>  data.unpersist() # unpersist previous version
>>  data=data2
>>
>> The data is checkpointed every 10 iterations to a directory that I
>> specified. While this seems to improve things a little bit, there is
>> still a lot of writing on disk (appcache directory, shown as "non
>> HDFS files" in Cloudera Manager) *besides* the checkpoint files
>> (which are regular HDFS files), and the application eventually runs
>> out of disk space. The same is true even if I checkpoint at every
>> iteration.
>>
>> What am I doing wrong? Maybe some garbage collector setting?
>>
>> Thanks a lot for the help,
>>
>> Aurelien
>>
>> Le 24/08/2015 10:39, alexis GILLAIN a écrit :
>>
>> Hi Aurelien,
>>
>> The first code should create a new RDD in memory at each iteration
>> (check the webui).
>> The second code will unpersist the RDD but that's not the main
>> problem.
>>
>> I think you have trouble due to long lineage as .cache() keep
>> track of
>> lineage for recovery.
>> You should have a look at checkpointing :
>>
>> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>>
>> 

Inferring JSON schema from a JSON string in a dataframe column

2015-09-02 Thread mstang
Hi,

On Spark 1.3, using Scala 10.4

Given an existing dataframe with two colums (col A = JSON string, col B =
int), is it possible to create a new dataframe from col A and automatically
generate the schema (similar to when json is loaded/read from file)?

Alternately... given an existing dataframe created by loading/reading a json
file.  After a filter is applied, is it possible to re-generate the schema
to only show objects present in the filtered dataframe?

The issue is that I have csv files where the first column is a json string
and the second column is the object type.  The json schema will vary greatly
from object type to object type.  

Currently I can read this into a dataframe as text, but I can't figure out
how to create dataframes from the json for a given object (without
pre-defining the schemas).

I could alter the source structure to just be json, including the object
type as a json object... the issue here is that when I create the dataframe
the schema includes the objects for all the object types.  When I filter
this by object type, the schema is still the huge schema representing all
object types (so saving as parquet for example, I would end up with a 1000+
empty columns unless I again had a predefined schema for each object type).

Any ideas other than:
  1)pre-defining object type schema: schema are large and changing
  2)splitting the source data by object type: currently working with ~1k
files per hour, splitting I'd be working with ~50k files per hour
  3)writing out to disk for each object as text and reading back in as JSON:
with repartition files could be reduced but there would be more disk io. 

Any help would be appreciated.

Mike



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Inferring-JSON-schema-from-a-JSON-string-in-a-dataframe-column-tp24559.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark MLlib Decision Tree Node Accuracy

2015-09-02 Thread derechan
Hi,
I'm working on analyzing a decision tree in Apache spark 1.2. I've built the
tree and I can traverse it to get the splits and impurity. I can't seem to
find a way to get which records fall into each node. My goal is to get a
count of how accurate each leaf is at identifying the classification. I used
MulticlassMetrics with predicting each on to get the accuracy of the tree
overall, but I'm not sure how to do this for a branch of the tree. Any input
would be appreciated. Thanks.


From,
Derek



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-MLlib-Decision-Tree-Node-Accuracy-tp24561.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Memory-efficient successive calls to repartition()

2015-09-02 Thread shahid ashraf
Hi Guys

It seems my problems is related to his question as well. i am running
standalone spark 1.4.1 on local machine

i have 10 partitions with data skew on partition 1 and 4 partition: [(0,
0), (*1, 15593259)*, (2, 0), (3, 0), (*4, 20695601)*, (5, 0), (6, 0), (7,
0), (8, 0), (9, 0)] and elements: >>

Now i try to rdd.repartition(10) and getting errors like ERROR Executor:
Exception in task 1.0 in stage 10.0 (TID 61)
java.lang.OutOfMemoryError: Java heap space and* ERROR
DiskBlockObjectWriter  in stack trace*


*I tried to enhance  "spark.executor.memory", "4g" as well getting same
errors again*

15/09/02 21:12:43 ERROR Executor: Exception in task 1.0 in stage 10.0 (TID
61)
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:113)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:97)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/09/02 21:12:43 WARN TaskSetManager: Lost task 1.0 in stage 10.0 (TID 61,
localhost): java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:113)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:97)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/09/02 21:12:43 ERROR TaskSetManager: Task 1 in stage 10.0 failed 1
times; aborting job
15/09/02 21:12:43 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:113)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:97)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at

Kafka Direct Stream join without data shuffle

2015-09-02 Thread Chen Song
I have a stream got from Kafka with direct approach, say, inputStream, I
need to

1. Create another DStream derivedStream with map or mapPartitions (with
some data enrichment with reference table) on inputStream
2. Join derivedStream with inputStream

In my use case, I don't need to shuffle data. Each partition in
derivedStream only needs to be joined with the corresponding partition in
the original parent inputStream it is generated from.

My question is

1. Is there a Partitioner defined in KafkaRDD at all?
2. How would I preserve the partitioning scheme and avoid data shuffle?

-- 
Chen Song


Re: Kafka Direct Stream join without data shuffle

2015-09-02 Thread Cody Koeninger
No, there isn't a partitioner for KafkaRDD (KafkaRDD may not even be a pair
rdd, for instance).

It sounds to me like if it's a self-join, you should be able to do it in a
single mapPartition operation.

On Wed, Sep 2, 2015 at 3:06 PM, Chen Song  wrote:

> I have a stream got from Kafka with direct approach, say, inputStream, I
> need to
>
> 1. Create another DStream derivedStream with map or mapPartitions (with
> some data enrichment with reference table) on inputStream
> 2. Join derivedStream with inputStream
>
> In my use case, I don't need to shuffle data. Each partition in
> derivedStream only needs to be joined with the corresponding partition in
> the original parent inputStream it is generated from.
>
> My question is
>
> 1. Is there a Partitioner defined in KafkaRDD at all?
> 2. How would I preserve the partitioning scheme and avoid data shuffle?
>
> --
> Chen Song
>
>


Parquet partitioning for unique identifier

2015-09-02 Thread Kohki Nishio
Hello experts,

I have a huge json file (> 40G) and trying to use Parquet as a file format.
Each entry has a unique identifier but other than that, it doesn't have
'well balanced value' column to partition it. Right now it just throws OOM
and couldn't figure out what to do with it.

It would be ideal if I could provide a partitioner based on the unique
identifier value like computing its hash value or something.  One of the
option would be to produce a hash value and add it as a separate column,
but it doesn't sound right to me. Is there any other ways I can try ?

Regards,
-- 
Kohki Nishio


Problem while loading saved data

2015-09-02 Thread Amila De Silva
Hi All,

I have a two node spark cluster, to which I'm connecting using IPython
notebook.
To see how data saving/loading works, I simply created a dataframe using
people.json using the Code below;

df = sqlContext.read.json("examples/src/main/resources/people.json")

Then called the following to save the dataframe as a parquet.
df.write.save("people.parquet")

Tried loading the saved dataframe using;
df2 = sqlContext.read.parquet('people.parquet');

But this simply fails giving the following exception

---Py4JJavaError
Traceback (most recent call
last) in ()> 1 df2 =
sqlContext.read.parquet('people.parquet2');
/srv/spark/python/pyspark/sql/readwriter.pyc in parquet(self, *path)
 154 [('name', 'string'), ('year', 'int'), ('month', 'int'),
('day', 'int')]155 """--> 156 return
self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))
 157 158 @since(1.4)
/srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
__call__(self, *args)536 answer =
self.gateway_client.send_command(command)537 return_value
= get_return_value(answer, self.gateway_client,--> 538
self.target_id, self.name)539 540 for temp_arg in
temp_args:
/srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)298
  raise Py4JJavaError(299 'An error
occurred while calling {0}{1}{2}.\n'.--> 300
format(target_id, '.', name), value)301 else:302
  raise Py4JError(
Py4JJavaError: An error occurred while calling o53840.parquet.
: java.lang.AssertionError: assertion failed: No schema defined, and
no Parquet data file or summary file found under
file:/home/ubuntu/ipython/people.parquet2.
at scala.Predef$.assert(Predef.scala:179)
at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.org$apache$spark$sql$parquet$ParquetRelation2$MetadataCache$$readSchema(newParquet.scala:429)
at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$11.apply(newParquet.scala:369)
at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$11.apply(newParquet.scala:369)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:369)
at 
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$metadataCache$lzycompute(newParquet.scala:126)
at 
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$metadataCache(newParquet.scala:124)
at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:165)
at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:165)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.sql.parquet.ParquetRelation2.dataSchema(newParquet.scala:165)
at 
org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:506)
at 
org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:505)
at 
org.apache.spark.sql.sources.LogicalRelation.(LogicalRelation.scala:30)
at 
org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:438)
at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:264)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:722)



I'm using spark-1.4.1-bin-hadoop2.6 with java 1.7.

 Thanks
Amila


`sbt core/test` hangs on LogUrlsStandaloneSuite?

2015-09-02 Thread Jacek Laskowski
Hi,

Am I doing something off base to execute tests for core module using
sbt as follows?

[spark]> core/test
...
[info] KryoSerializerAutoResetDisabledSuite:
[info] - sort-shuffle with bypassMergeSort (SPARK-7873) (53 milliseconds)
[info] - calling deserialize() after deserializeStream() (2 milliseconds)
[info] LogUrlsStandaloneSuite:
...AND HANGS HERE :(

The reason I'm asking is that the command hangs after printing the
above [info]. I'm on Mac OS and Java 8 with the latest sources -
fc48307797912dc1d53893dce741ddda8630957b.

While taking a thread dump I can see quite a few WAITINGs and
TIMED_WAITING "at sun.misc.Unsafe.park(Native Method)"

Pozdrawiam,
Jacek

--
Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Understanding Batch Processing Time

2015-09-02 Thread Snehal Nagmote
Hi All,

I have spark job where I read data from Kafka every 5 seconds interval and
query Cassandra based on Kafka data using spark Cassandra Connector ,

I am using spark 1.4 , Often the batch gets stuck in processing after job
Id 352 . Spark takes long time to spawn job 353 where it reads from
Cassandra .

I want to understand since job processing time are in seconds, what else
contribute to processing time ?

What could be the potential issues which is adding processing time here ?
Any inputs would be helpful.




[image: Inline images 2]

Thanks,
Snehal


Re: cached data between jobs

2015-09-02 Thread Eric Walker
Hi Jeff,

I think I see what you're saying.  I was thinking more of a whole Spark
job, where `spark-submit` is run once to completion and then started up
again, rather than a "job" as seen in the Spark UI.  I take it there is no
implicit caching of results between `spark-submit` runs.

(In the case I was writing about, I think I read too much into the Ganglia
network traffic view.  During the runs which I believed to be IO-bound, I
was carrying out a long-running database transfer on the same network.
After it completed I saw a speedup, not realizing where it came from, and
wondered whether there had been some kind of shifting in the data.)

Eric


On Tue, Sep 1, 2015 at 9:54 PM, Jeff Zhang  wrote:

> Hi Eric,
>
> If the 2 jobs share the same parent stages. these stages can be skipped
> for the second job.
>
> Here's one simple example:
>
> val rdd1 = sc.parallelize(1 to 10).map(e=>(e,e))
> val rdd2 = rdd1.groupByKey()
> rdd2.map(e=>e._1).collect() foreach println
> rdd2.map(e=> (e._1, e._2.size)).collect foreach println
>
> Obviously, there are 2 jobs and both of them have 2 stages. Luckily here
> these 2 jobs share the same stage (the first stage of each job), although
> you doesn't cache these data explicitly, once one stage is completed, it is
> marked as available and can used for other jobs. so for the second job, it
> only needs to run one stage.
> You should be able to see the skipped stage in the spark job ui.
>
>
>
> [image: Inline image 1]
>
> On Wed, Sep 2, 2015 at 12:53 AM, Eric Walker 
> wrote:
>
>> Hi,
>>
>> I'm noticing that a 30 minute job that was initially IO-bound may not be
>> during subsequent runs.  Is there some kind of between-job caching that
>> happens in Spark or in Linux that outlives jobs and that might be making
>> subsequent runs faster?  If so, is there a way to avoid the caching in
>> order to get a better sense of the worst-case scenario?
>>
>> (It's also possible that I've simply changed something that made things
>> faster.)
>>
>> Eric
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Unbale to run Group BY on Large File

2015-09-02 Thread SAHA, DEBOBROTA
Hi ,

I am getting below error while I am trying to select data using SPARK SQL from 
a RDD table.

java.lang.OutOfMemoryError: GC overhead limit exceeded
"Spark Context Cleaner" java.lang.InterruptedException


The file or table size is around 113 GB and I am running SPARK 1.4 on a 
standalone cluster. Tried to extend the heap size but extending to 64GB also 
didn't help.

I would really appreciate any help on this.

Thanks,
Debobrota


spark-submit not using conf/spark-defaults.conf

2015-09-02 Thread Axel Dahl
in my spark-defaults.conf I have:
spark.files   file1.zip, file2.py
spark.master   spark://master.domain.com:7077

If I execute:
bin/pyspark

I can see it adding the files correctly.

However if I execute

bin/spark-submit test.py

where test.py relies on the file1.zip, I get and error.

If I i instead execute

bin/spark-submit --py-files file1.zip test.py

It works as expected.

How do I get spark-submit to import the spark-defaults.conf file or what
should I start checking to figure out why one works and the other doesn't?

Thanks,

-Axel


Re: Unbale to run Group BY on Large File

2015-09-02 Thread Silvio Fiorito
Unfortunately, groupBy is not the most efficient operation. What is it you’re 
trying to do? It may be possible with one of the other *byKey transformations.

From: "SAHA, DEBOBROTA"
Date: Wednesday, September 2, 2015 at 7:46 PM
To: "'user@spark.apache.org'"
Subject: Unbale to run Group BY on Large File

Hi ,

I am getting below error while I am trying to select data using SPARK SQL from 
a RDD table.

java.lang.OutOfMemoryError: GC overhead limit exceeded
"Spark Context Cleaner" java.lang.InterruptedException


The file or table size is around 113 GB and I am running SPARK 1.4 on a 
standalone cluster. Tried to extend the heap size but extending to 64GB also 
didn’t help.

I would really appreciate any help on this.

Thanks,
Debobrota


Re: Problem while loading saved data

2015-09-02 Thread Guru Medasani
Hi Amila,

Error says that the ‘people.parquet’ file does not exist. Can you manually 
check to see if that file exists?

> Py4JJavaError: An error occurred while calling o53840.parquet.
> : java.lang.AssertionError: assertion failed: No schema defined, and no 
> Parquet data file or summary file found under 
> file:/home/ubuntu/ipython/people.parquet2.


Guru Medasani
gdm...@gmail.com



> On Sep 2, 2015, at 8:25 PM, Amila De Silva  wrote:
> 
> Hi All,
> 
> I have a two node spark cluster, to which I'm connecting using IPython 
> notebook.
> To see how data saving/loading works, I simply created a dataframe using 
> people.json using the Code below;
> 
> df = sqlContext.read.json("examples/src/main/resources/people.json")
> 
> Then called the following to save the dataframe as a parquet.
> df.write.save("people.parquet")
> 
> Tried loading the saved dataframe using;
> df2 = sqlContext.read.parquet('people.parquet');
> 
> But this simply fails giving the following exception
> 
> ---
> Py4JJavaError Traceback (most recent call last)
>  in ()
> > 1 df2 = sqlContext.read.parquet('people.parquet2');
> 
> /srv/spark/python/pyspark/sql/readwriter.pyc in parquet(self, *path)
> 154 [('name', 'string'), ('year', 'int'), ('month', 'int'), 
> ('day', 'int')]
> 155 """
> --> 156 return 
> self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))
> 157 
> 158 @since(1.4)
> 
> /srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in 
> __call__(self, *args)
> 536 answer = self.gateway_client.send_command(command)
> 537 return_value = get_return_value(answer, self.gateway_client,
> --> 538 self.target_id, self.name )
> 539 
> 540 for temp_arg in temp_args:
> 
> /srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)
> 298 raise Py4JJavaError(
> 299 'An error occurred while calling {0}{1}{2}.\n'.
> --> 300 format(target_id, '.', name), value)
> 301 else:
> 302 raise Py4JError(
> 
> Py4JJavaError: An error occurred while calling o53840.parquet.
> : java.lang.AssertionError: assertion failed: No schema defined, and no 
> Parquet data file or summary file found under 
> file:/home/ubuntu/ipython/people.parquet2.
>   at scala.Predef$.assert(Predef.scala:179)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.org$apache$spark$sql$parquet$ParquetRelation2$MetadataCache$$readSchema(newParquet.scala:429)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$11.apply(newParquet.scala:369)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$11.apply(newParquet.scala:369)
>   at scala.Option.orElse(Option.scala:257)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:369)
>   at org.apache.spark.sql.parquet.ParquetRelation2.org 
> $apache$spark$sql$parquet$ParquetRelation2$$metadataCache$lzycompute(newParquet.scala:126)
>   at org.apache.spark.sql.parquet.ParquetRelation2.org 
> $apache$spark$sql$parquet$ParquetRelation2$$metadataCache(newParquet.scala:124)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:165)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:165)
>   at scala.Option.getOrElse(Option.scala:120)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2.dataSchema(newParquet.scala:165)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:506)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:505)
>   at 
> org.apache.spark.sql.sources.LogicalRelation.(LogicalRelation.scala:30)
>   at 
> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:438)
>   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:264)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:601)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>   at py4j.Gateway.invoke(Gateway.java:259)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> 

FlatMap Explanation

2015-09-02 Thread Ashish Soni
Hi ,

Can some one please explain the output of the flat map
data in RDD as below
{1, 2, 3, 3}

rdd.flatMap(x => x.to(3))

output as below

{1, 2, 3, 2, 3, 3, 3}
i am not able to understand how the output came as above.

Thanks,


Alter table fails to find table

2015-09-02 Thread Tim Smith
Spark 1.3.0 (CDH 5.4.4)

scala> sqlContext.sql("SHOW TABLES").collect
res18: Array[org.apache.spark.sql.Row] = Array([allactivitydata,true],
[sample_07,false], [sample_08,false])

sqlContext.sql("SELECT COUNT(*) from allactivitydata").collect
res19: Array[org.apache.spark.sql.Row] = Array([1227230])

scala> sqlContext.sql("ALTER TABLE allactivitydata ADD COLUMNS (test
STRING)");
15/09/03 04:23:16 INFO ParseDriver: Parsing command: ALTER TABLE
allactivitydata ADD COLUMNS (test STRING)
15/09/03 04:23:16 INFO ParseDriver: Parse Completed
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO ParseDriver: Parsing command: ALTER TABLE
allactivitydata ADD COLUMNS (test STRING)
15/09/03 04:23:16 INFO ParseDriver: Parse Completed
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 INFO PerfLogger: 
15/09/03 04:23:16 ERROR Driver: FAILED: SemanticException [Error 10001]:
Table not found default.allactivitydata
org.apache.hadoop.hive.ql.parse.SemanticException: Table not found
default.allactivitydata
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getTable(BaseSemanticAnalyzer.java:1332)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.getTable(BaseSemanticAnalyzer.java:1315)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.addInputsOutputsAlterTable(DDLSemanticAnalyzer.java:1387)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.addInputsOutputsAlterTable(DDLSemanticAnalyzer.java:1374)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeAlterTableModifyCols(DDLSemanticAnalyzer.java:2611)
at
org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeInternal(DDLSemanticAnalyzer.java:255)
at
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:222)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:423)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:307)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1112)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1160)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1049)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1039)
at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:308)
at
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:280)
at
org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:55)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:55)
at
org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1088)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1088)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:147)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:130)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:92)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:32)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:37)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
at
$line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
at $line83.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:57)
at $line83.$read$$iwC$$iwC$$iwC$$iwC.(:59)
at $line83.$read$$iwC$$iwC$$iwC.(:61)
at $line83.$read$$iwC$$iwC.(:63)
at $line83.$read$$iwC.(:65)
at $line83.$read.(:67)
at $line83.$read$.(:71)
at $line83.$read$.()
at $line83.$eval$.(:7)
at $line83.$eval$.()
at $line83.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at

Re: Hbase Lookup

2015-09-02 Thread Jörn Franke
You may check if it makes sense to write a coprocessor doing an upsert for
you, if it does not exist already. Maybe phoenix for Hbase supports this
already.

Another alternative, if the records do not have an unique Id, is to put
them into a text index engine, such as Solr or Elasticsearch, which does in
this case a fast matching with relevancy scores.


You can use also Spark and Pig there. However, I am not sure if Spark is
suitable for these one row lookups. Same holds for Pig.


Le mer. 2 sept. 2015 à 23:53, ayan guha  a écrit :

Hello group

I am trying to use pig or spark in order to achieve following:

1. Write a batch process which will read from a file
2. Lookup hbase to see if the record exists. If so then need to compare
incoming values with hbase and update fields which do not match. Else
create a new record.

My questions:
1. Is this a good use case for pig or spark?
2. Is there any way to read hbase for each incoming record in pig without
writing map reduce code?
3. In case of spark I think we have to connect to hbase for every record.
Is thr any other way?
4. What is the best connector for hbase which gives this functionality?

Best

Ayan


Re: Parquet partitioning for unique identifier

2015-09-02 Thread Raghavendra Pandey
Did you specify partitioning column while saving data..
On Sep 3, 2015 5:41 AM, "Kohki Nishio"  wrote:

> Hello experts,
>
> I have a huge json file (> 40G) and trying to use Parquet as a file
> format. Each entry has a unique identifier but other than that, it doesn't
> have 'well balanced value' column to partition it. Right now it just throws
> OOM and couldn't figure out what to do with it.
>
> It would be ideal if I could provide a partitioner based on the unique
> identifier value like computing its hash value or something.  One of the
> option would be to produce a hash value and add it as a separate column,
> but it doesn't sound right to me. Is there any other ways I can try ?
>
> Regards,
> --
> Kohki Nishio
>


Re: Unbale to run Group BY on Large File

2015-09-02 Thread Raghavendra Pandey
You can increase number of partitions n try...
On Sep 3, 2015 5:33 AM, "Silvio Fiorito" 
wrote:

> Unfortunately, groupBy is not the most efficient operation. What is it
> you’re trying to do? It may be possible with one of the other *byKey
> transformations.
>
> From: "SAHA, DEBOBROTA"
> Date: Wednesday, September 2, 2015 at 7:46 PM
> To: "'user@spark.apache.org'"
> Subject: Unbale to run Group BY on Large File
>
> Hi ,
>
>
>
> I am getting below error while I am trying to select data using SPARK SQL
> from a RDD table.
>
>
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> "Spark Context Cleaner" java.lang.InterruptedException
>
>
>
>
>
> The file or table size is around 113 GB and I am running SPARK 1.4 on a
> standalone cluster. Tried to extend the heap size but extending to 64GB
> also didn’t help.
>
>
>
> I would really appreciate any help on this.
>
>
>
> Thanks,
>
> Debobrota
>


Re: Hbase Lookup

2015-09-02 Thread ayan guha
Thanks for your info. I am planning to implement a pig udf to do record
look ups. Kindly let me know if this is a good idea.

Best
Ayan

On Thu, Sep 3, 2015 at 2:55 PM, Jörn Franke  wrote:

>
> You may check if it makes sense to write a coprocessor doing an upsert for
> you, if it does not exist already. Maybe phoenix for Hbase supports this
> already.
>
> Another alternative, if the records do not have an unique Id, is to put
> them into a text index engine, such as Solr or Elasticsearch, which does in
> this case a fast matching with relevancy scores.
>
>
> You can use also Spark and Pig there. However, I am not sure if Spark is
> suitable for these one row lookups. Same holds for Pig.
>
>
> Le mer. 2 sept. 2015 à 23:53, ayan guha  a écrit :
>
> Hello group
>
> I am trying to use pig or spark in order to achieve following:
>
> 1. Write a batch process which will read from a file
> 2. Lookup hbase to see if the record exists. If so then need to compare
> incoming values with hbase and update fields which do not match. Else
> create a new record.
>
> My questions:
> 1. Is this a good use case for pig or spark?
> 2. Is there any way to read hbase for each incoming record in pig without
> writing map reduce code?
> 3. In case of spark I think we have to connect to hbase for every record.
> Is thr any other way?
> 4. What is the best connector for hbase which gives this functionality?
>
> Best
>
> Ayan
>
>
>


-- 
Best Regards,
Ayan Guha


Re: spark-submit not using conf/spark-defaults.conf

2015-09-02 Thread Davies Liu
This should be a bug, could you create a JIRA for it?

On Wed, Sep 2, 2015 at 4:38 PM, Axel Dahl  wrote:
> in my spark-defaults.conf I have:
> spark.files   file1.zip, file2.py
> spark.master   spark://master.domain.com:7077
>
> If I execute:
> bin/pyspark
>
> I can see it adding the files correctly.
>
> However if I execute
>
> bin/spark-submit test.py
>
> where test.py relies on the file1.zip, I get and error.
>
> If I i instead execute
>
> bin/spark-submit --py-files file1.zip test.py
>
> It works as expected.
>
> How do I get spark-submit to import the spark-defaults.conf file or what
> should I start checking to figure out why one works and the other doesn't?
>
> Thanks,
>
> -Axel

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: large number of import-related function calls in PySpark profile

2015-09-02 Thread Davies Liu
Could you have a short script to reproduce this?

On Wed, Sep 2, 2015 at 2:10 PM, Priedhorsky, Reid  wrote:
> Hello,
>
> I have a PySpark computation that relies on Pandas and NumPy. Currently, my
> inner loop iterates 2,000 times. I’m seeing the following show up in my
> profiling:
>
> 74804/291020.2040.0002.1730.000  importlib._bootstrap>:2234(_find_and_load)
> 74804/291020.1450.0001.8670.000  importlib._bootstrap>:2207(_find_and_load_unlocked)
> 45704/291020.0210.0001.8200.000  importlib._bootstrap>:313(_call_with_frames_removed)
> 45702/291000.0480.0001.7930.000 {built-in method __import__}
>
>
> That is, there are over 10 apparently import-related calls for each
> iteration of my inner loop. Commenting out the content of my loop removes
> most of the calls, and the number of them seems to scale with the number of
> inner loop iterations, so I’m pretty sure these calls are indeed coming from
> there.
>
> Further examination of the profile shows that the callers of these functions
> are inside Pandas, e.g. tseries.period.__getitem__(), which reads as
> follows:
>
> def __getitem__(self, key):
> getitem = self._data.__getitem__
> if np.isscalar(key):
> val = getitem(key)
> return Period(ordinal=val, freq=self.freq)
> else:
> if com.is_bool_indexer(key):
> key = np.asarray(key)
>
> result = getitem(key)
> if result.ndim > 1:
> # MPL kludge
> # values = np.asarray(list(values), dtype=object)
> # return values.reshape(result.shape)
>
> return PeriodIndex(result, name=self.name, freq=self.freq)
>
> return PeriodIndex(result, name=self.name, freq=self.freq)
>
>
> Note that there are not import statements here or calls to the functions
> above. My guess is that somehow PySpark’s pickle stuff is inserting them,
> e.g., around the self._data access.
>
> This is single-node testing currently. At this scale, about 1/3 of the time
> is spent in these import functions.
>
> Pandas and other modules are available on all workers either via the
> virtualenv or PYTHONPATH. I am not using --py-files.
>
> Since the inner loop is performance-critical, I can’t have imports happening
> there. My question is, why are these import functions being called and how
> can I avoid them?
>
> Thanks for any help.
>
> Reid
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Getting an error when trying to read a GZIPPED file

2015-09-02 Thread Spark Enthusiast
Folks,
I have an input file which is gzipped. I use sc.textFile("foo.gz") when I see 
the following problem. Can someone help me how to fix this?
15/09/03 10:05:32 INFO deprecation: mapred.job.id is deprecated. Instead, use 
mapreduce.job.id15/09/03 10:05:32 INFO CodecPool: Got brand-new decompressor 
[.gz]15/09/03 10:06:15 WARN MemoryStore: Not enough space to cache rdd_2_0 in 
memory! (computed 216.3 MB so far)15/09/03 10:06:15 INFO MemoryStore: Memory 
use = 156.2 KB (blocks) + 213.1 MB (scratch space shared across 1 thread(s)) = 
213.3 MB. Storage limit = 265.1 MB.





Re: FlatMap Explanation

2015-09-02 Thread Raghavendra Pandey
Flatmap is just like map but it flattens out the seq output of the
closure...
In your case, you call "to" function that is to return list...

a.to(b) returns list(a,...,b)

So rdd.flatMap( x => x.to(3)) will take all element and return range upto
3..
On Sep 3, 2015 7:36 AM, "Ashish Soni"  wrote:

> Hi ,
>
> Can some one please explain the output of the flat map
> data in RDD as below
> {1, 2, 3, 3}
>
> rdd.flatMap(x => x.to(3))
>
> output as below
>
> {1, 2, 3, 2, 3, 3, 3}
> i am not able to understand how the output came as above.
>
> Thanks,
>
>


Re: Problem while loading saved data

2015-09-02 Thread Amila De Silva
Hi Guru,

Thanks for the reply.

Yes, I checked if the file exists. But instead of a single file what I
found was a directory having the following structure.

people.parquet
└── _temporary
└── 0
├── task_201509030057_4699_m_00
│   └── part-r-0-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
├── task_201509030057_4699_m_01
│   └── part-r-1-b921ed54-53fa-459b-881c-cccde7f79320.gz.parquet
└── _temporary


On Thu, Sep 3, 2015 at 7:13 AM, Guru Medasani  wrote:

> Hi Amila,
>
> Error says that the ‘people.parquet’ file does not exist. Can you manually
> check to see if that file exists?
>
> Py4JJavaError: An error occurred while calling o53840.parquet.
> : java.lang.AssertionError: assertion failed: No schema defined, and no 
> Parquet data file or summary file found under 
> file:/home/ubuntu/ipython/people.parquet2.
>
>
>
> Guru Medasani
> gdm...@gmail.com
>
>
>
> On Sep 2, 2015, at 8:25 PM, Amila De Silva  wrote:
>
> Hi All,
>
> I have a two node spark cluster, to which I'm connecting using IPython
> notebook.
> To see how data saving/loading works, I simply created a dataframe using
> people.json using the Code below;
>
> df = sqlContext.read.json("examples/src/main/resources/people.json")
>
> Then called the following to save the dataframe as a parquet.
> df.write.save("people.parquet")
>
> Tried loading the saved dataframe using;
> df2 = sqlContext.read.parquet('people.parquet');
>
> But this simply fails giving the following exception
>
> ---Py4JJavaError
>  Traceback (most recent call 
> last) in ()> 1 df2 = 
> sqlContext.read.parquet('people.parquet2');
> /srv/spark/python/pyspark/sql/readwriter.pyc in parquet(self, *path)154   
>   [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] 
>155 """--> 156 return 
> self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))157   
>   158 @since(1.4)
> /srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in 
> __call__(self, *args)536 answer = 
> self.gateway_client.send_command(command)537 return_value = 
> get_return_value(answer, self.gateway_client,--> 538 
> self.target_id, self.name)539 540 for temp_arg in temp_args:
> /srv/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)298  
>raise Py4JJavaError(299 'An error occurred while 
> calling {0}{1}{2}.\n'.--> 300 format(target_id, '.', 
> name), value)301 else:302 raise Py4JError(
> Py4JJavaError: An error occurred while calling o53840.parquet.
> : java.lang.AssertionError: assertion failed: No schema defined, and no 
> Parquet data file or summary file found under 
> file:/home/ubuntu/ipython/people.parquet2.
>   at scala.Predef$.assert(Predef.scala:179)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.org$apache$spark$sql$parquet$ParquetRelation2$MetadataCache$$readSchema(newParquet.scala:429)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$11.apply(newParquet.scala:369)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$11.apply(newParquet.scala:369)
>   at scala.Option.orElse(Option.scala:257)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:369)
>   at org.apache.spark.sql.parquet.ParquetRelation2.org 
> $apache$spark$sql$parquet$ParquetRelation2$$metadataCache$lzycompute(newParquet.scala:126)
>   at org.apache.spark.sql.parquet.ParquetRelation2.org 
> $apache$spark$sql$parquet$ParquetRelation2$$metadataCache(newParquet.scala:124)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:165)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:165)
>   at scala.Option.getOrElse(Option.scala:120)
>   at 
> org.apache.spark.sql.parquet.ParquetRelation2.dataSchema(newParquet.scala:165)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:506)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:505)
>   at 
> org.apache.spark.sql.sources.LogicalRelation.(LogicalRelation.scala:30)
>   at 
> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:438)
>   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:264)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> 

Re: Resource allocation in SPARK streaming

2015-09-02 Thread Akhil Das
Well in spark, you can get the information that you need from the driver ui
running on port 4040, click on the active job, then click on the stages and
inside the stages you will find the tasks and the machine address on which
the task is being executed, you can also check the cpu load on that machine
(by doing a top or htop). In spark, a single core is allocated to a single
task.

Thanks
Best Regards

On Thu, Sep 3, 2015 at 4:49 AM, anshu shukla  wrote:

> I tried  to find but is *unable to get clear picture about the  resource
> allocation*  at the level of thread/CORE done in spark .
>
> Actually my problem is that  I am comparing CPU usage by spark and storm
> but in case of storm I know which bolt is running on which machine and over
> how many cores .How to do that in spark .
> I am searching for  explanation for   the high CPU Whiskers in plot
> attached . Every Box plot is a topology with CPU  usage .
>
> Thanks in advance !
>
> On Tue, Sep 1, 2015 at 11:25 PM, anshu shukla 
> wrote:
>
>> I am not much clear about  resource allocation (CPU/CORE/Thread  level
>> allocation)  as per the parallelism by  setting  number of cores in  spark
>>  standalone mode .
>>
>> Any guidelines for that .
>>
>> --
>> Thanks & Regards,
>> Anshu Shukla
>>
>
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


Re: spark-submit not using conf/spark-defaults.conf

2015-09-02 Thread Axel Dahl
So a bit more investigation, shows that:

if I have configured spark-defaults.conf with:

"spark.files  library.py"

then if I call

"spark-submit.py -v test.py"

I see that my "spark.files" default option has been replaced with
"spark.files  test.py",  basically spark-submit is overwriting
spark.files with the name of the script.

Is this a bug or is there another way to add default libraries without
having to specify them on the command line?

Thanks,

-Axel



On Wed, Sep 2, 2015 at 10:34 PM, Davies Liu  wrote:

> This should be a bug, could you create a JIRA for it?
>
> On Wed, Sep 2, 2015 at 4:38 PM, Axel Dahl  wrote:
> > in my spark-defaults.conf I have:
> > spark.files   file1.zip, file2.py
> > spark.master   spark://master.domain.com:7077
> >
> > If I execute:
> > bin/pyspark
> >
> > I can see it adding the files correctly.
> >
> > However if I execute
> >
> > bin/spark-submit test.py
> >
> > where test.py relies on the file1.zip, I get and error.
> >
> > If I i instead execute
> >
> > bin/spark-submit --py-files file1.zip test.py
> >
> > It works as expected.
> >
> > How do I get spark-submit to import the spark-defaults.conf file or what
> > should I start checking to figure out why one works and the other
> doesn't?
> >
> > Thanks,
> >
> > -Axel
>


Re: Understanding Batch Processing Time

2015-09-02 Thread Tathagata Das
Can you jstack into the driver and see what is process doing after job 352?

Also to confirm, the system is stuck after job 352 finishes, and before job
353 starts (shows up in the UI), right?

TD

On Wed, Sep 2, 2015 at 12:55 PM, Snehal Nagmote 
wrote:

> Hi All,
>
> I have spark job where I read data from Kafka every 5 seconds interval and
> query Cassandra based on Kafka data using spark Cassandra Connector ,
>
> I am using spark 1.4 , Often the batch gets stuck in processing after job
> Id 352 . Spark takes long time to spawn job 353 where it reads from
> Cassandra .
>
> I want to understand since job processing time are in seconds, what else
> contribute to processing time ?
>
> What could be the potential issues which is adding processing time here ?
> Any inputs would be helpful.
>
>
>
>
> [image: Inline images 2]
>
> Thanks,
> Snehal
>


Spark DataFrame saveAsTable with partitionBy creates no ORC file in HDFS

2015-09-02 Thread unk1102
Hi I have a Spark dataframe which I want to save as hive table with
partitions. I tried the following two statements but they dont work I dont
see any ORC files in HDFS directory its empty. I can see baseTable is there
in Hive console but obviously its empty because of no files inside HDFS. The
following two lines saveAsTable() and insertInto()do not work.
registerDataFrameAsTable() method works but it creates in memory table and
causing OOM in my use case as I have thousands of hive partitions to
prcoess. Please guide I am new to Spark. Thanks in advance.

dataFrame.write().mode(SaveMode.Append).partitionBy("entity","date").format("orc").saveAsTable("baseTable");
 

dataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity","date").insertInto("baseTable");

//the following works but creates in memory table and seems to be reason for
OOM in my case

hiveContext.registerDataFrameAsTable(dataFrame, "baseTable");



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrame-saveAsTable-with-partitionBy-creates-no-ORC-file-in-HDFS-tp24562.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Save dataframe into hbase

2015-09-02 Thread Ted Yu
The following JIRA is close to integration:
HBASE-14181 Add Spark DataFrame DataSource to HBase-Spark Module

after which hbase would provide better support for DataFrame interaction.

On Wed, Sep 2, 2015 at 1:21 PM, ALEX K  wrote:

> you can use Phoenix-Spark plugin:
> https://phoenix.apache.org/phoenix_spark.html
>
> On Wed, Sep 2, 2015 at 4:04 AM, Hafiz Mujadid 
> wrote:
>
>> Hi
>>
>> What is the efficient way to save Dataframe into hbase?
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Save-dataframe-into-hbase-tp24552.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


wild cards in spark sql

2015-09-02 Thread Hafiz Mujadid
Hi

does spark sql support wild cards to filter data in sql queries just like we
can filter data in sql queries in RDBMS with different wild cards like % and
? etc. In other words how can i write following query in spar sql

select * from employee where ename like 'a%d'

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/wild-cards-in-spark-sql-tp24563.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ERROR WHILE REPARTITION

2015-09-02 Thread shahid ashraf
Hi Guys

 i am running standalone spark 1.4.1 on local machine

i have 10 partitions with data skew on partition 1 and 4 partition: [(0,
0), (*1, 15593259)*, (2, 0), (3, 0), (*4, 20695601)*, (5, 0), (6, 0), (7,
0), (8, 0), (9, 0)] and elements: >>

Now i try to rdd.repartition(10) and getting errors like ERROR Executor:
Exception in task 1.0 in stage 10.0 (TID 61)
java.lang.OutOfMemoryError: Java heap space and* ERROR
DiskBlockObjectWriter  in stack trace*


*I tried to enhance  "spark.executor.memory", "4g" as well getting same
errors again*

15/09/02 21:12:43 ERROR Executor: Exception in task 1.0 in stage 10.0 (TID
61)
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:113)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:97)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/09/02 21:12:43 WARN TaskSetManager: Lost task 1.0 in stage 10.0 (TID 61,
localhost): java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:113)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:97)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/09/02 21:12:43 ERROR TaskSetManager: Task 1 in stage 10.0 failed 1
times; aborting job
15/09/02 21:12:43 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:113)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:97)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at 

Re: spark 1.5 sort slow

2015-09-02 Thread Michael Armbrust
Can you include the output of `explain()` for each of the runs?

On Tue, Sep 1, 2015 at 1:06 AM, patcharee  wrote:

> Hi,
>
> I found spark 1.5 sorting is very slow compared to spark 1.4. Below is my
> code snippet
>
> val sqlRDD = sql("select date, u, v, z from fino3_hr3 where zone == 2
> and z >= 2 and z <= order by date, z")
> println("sqlRDD " + sqlRDD.count())
>
> The fino3_hr3 (in the sql command) is a hive table in orc format,
> partitioned by zone and z.
>
> Spark 1.5 takes 4.5 mins to execute this sql, while spark 1.4 takes 1.5
> mins. I noticed that dissimilar to spark 1.4 when spark 1.5 sorted, data
> was shuffled into few tasks, not divided for all tasks. Do I need to set
> any configuration explicitly? Any suggestions?
>
> BR,
> Patcharee
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Save dataframe into hbase

2015-09-02 Thread ALEX K
you can use Phoenix-Spark plugin:
https://phoenix.apache.org/phoenix_spark.html

On Wed, Sep 2, 2015 at 4:04 AM, Hafiz Mujadid 
wrote:

> Hi
>
> What is the efficient way to save Dataframe into hbase?
>
> Thanks
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Save-dataframe-into-hbase-tp24552.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Is it required to remove checkpoint when submitting a code change?

2015-09-02 Thread Ricardo Luis Silva Paiva
Hi,

Is there a way to submit an app code change, keeping the checkpoint data or
do I need to erase the checkpoint folder every time I re-submit the spark
app with a new jar?

I have an app that count pageviews streaming from Kafka, and deliver a file
every hour from the past 24 hours. I'm using reduceByKeyAndWindow with the
reduce and inverse functions set.

I'm doing some code improvements and would like to keep the data from the
past hours, so when I re-submit a code change, I would keep delivering the
pageviews aggregation without need to wait for 24 hours of new data.
Sometimes I'm just changing the submission parameters, like number of
executors, memory and cores.

Many thanks,

Ricardo

-- 
Ricardo Paiva
Big Data / Semântica
*globo.com* 


Re: wild cards in spark sql

2015-09-02 Thread Michael Armbrust
That query should work.

On Wed, Sep 2, 2015 at 1:50 PM, Hafiz Mujadid 
wrote:

> Hi
>
> does spark sql support wild cards to filter data in sql queries just like
> we
> can filter data in sql queries in RDBMS with different wild cards like %
> and
> ? etc. In other words how can i write following query in spar sql
>
> select * from employee where ename like 'a%d'
>
> thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/wild-cards-in-spark-sql-tp24563.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: wild cards in spark sql

2015-09-02 Thread Anas Sherwani
Yes, SparkSQL does support wildcards. The query you have written should work
as is, if the type of ename is string. You can find all the keywords and a
few supported functions at 
http://docs.datastax.com/en/datastax_enterprise/4.6/datastax_enterprise/spark/sparkSqlSupportedSyntax.html

  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/wild-cards-in-spark-sql-tp24563p24565.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



large number of import-related function calls in PySpark profile

2015-09-02 Thread Priedhorsky, Reid
Hello,

I have a PySpark computation that relies on Pandas and NumPy. Currently, my 
inner loop iterates 2,000 times. I’m seeing the following show up in my 
profiling:

74804/291020.2040.0002.1730.000 :2234(_find_and_load)
74804/291020.1450.0001.8670.000 :2207(_find_and_load_unlocked)
45704/291020.0210.0001.8200.000 :313(_call_with_frames_removed)
45702/291000.0480.0001.7930.000 {built-in method __import__}

That is, there are over 10 apparently import-related calls for each iteration 
of my inner loop. Commenting out the content of my loop removes most of the 
calls, and the number of them seems to scale with the number of inner loop 
iterations, so I’m pretty sure these calls are indeed coming from there.

Further examination of the profile shows that the callers of these functions 
are inside Pandas, e.g. tseries.period.__getitem__(), which reads as follows:

def __getitem__(self, key):
getitem = self._data.__getitem__
if np.isscalar(key):
val = getitem(key)
return Period(ordinal=val, freq=self.freq)
else:
if com.is_bool_indexer(key):
key = np.asarray(key)

result = getitem(key)
if result.ndim > 1:
# MPL kludge
# values = np.asarray(list(values), dtype=object)
# return values.reshape(result.shape)

return PeriodIndex(result, name=self.name, freq=self.freq)

return PeriodIndex(result, name=self.name, freq=self.freq)

Note that there are not import statements here or calls to the functions above. 
My guess is that somehow PySpark’s pickle stuff is inserting them, e.g., around 
the self._data access.

This is single-node testing currently. At this scale, about 1/3 of the time is 
spent in these import functions.

Pandas and other modules are available on all workers either via the virtualenv 
or PYTHONPATH. I am not using --py-files.

Since the inner loop is performance-critical, I can’t have imports happening 
there. My question is, why are these import functions being called and how can 
I avoid them?

Thanks for any help.

Reid



Re: Understanding Batch Processing Time

2015-09-02 Thread Snehal Nagmote
Hi ,

Thank you for your reply ,

Yes , I confirm , the system is stuck after job 352 finishes and before job
353 starts (shows up in the UI)

I will start job again and will take jstack if I can reproduce the problem

Thanks,
Snehal


On 2 September 2015 at 13:34, Tathagata Das  wrote:

> Can you jstack into the driver and see what is process doing after job 352?
>
> Also to confirm, the system is stuck after job 352 finishes, and before
> job 353 starts (shows up in the UI), right?
>
> TD
>
> On Wed, Sep 2, 2015 at 12:55 PM, Snehal Nagmote 
> wrote:
>
>> Hi All,
>>
>> I have spark job where I read data from Kafka every 5 seconds interval
>> and query Cassandra based on Kafka data using spark Cassandra Connector ,
>>
>> I am using spark 1.4 , Often the batch gets stuck in processing after job
>> Id 352 . Spark takes long time to spawn job 353 where it reads from
>> Cassandra .
>>
>> I want to understand since job processing time are in seconds, what else
>> contribute to processing time ?
>>
>> What could be the potential issues which is adding processing time here ?
>> Any inputs would be helpful.
>>
>>
>>
>>
>> [image: Inline images 2]
>>
>> Thanks,
>> Snehal
>>
>
>


Re: Is it required to remove checkpoint when submitting a code change?

2015-09-02 Thread Cody Koeninger
Yeah, in general if you're changing the jar you can't recover the
checkpoint.

If you're just changing parameters, why not externalize those in a
configuration file so your jar doesn't change?  I tend to stick even my
app-specific parameters in an external spark config so everything is in one
place.

On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
ricardo.pa...@corp.globo.com> wrote:

> Hi,
>
> Is there a way to submit an app code change, keeping the checkpoint data
> or do I need to erase the checkpoint folder every time I re-submit the
> spark app with a new jar?
>
> I have an app that count pageviews streaming from Kafka, and deliver a
> file every hour from the past 24 hours. I'm using reduceByKeyAndWindow with
> the reduce and inverse functions set.
>
> I'm doing some code improvements and would like to keep the data from the
> past hours, so when I re-submit a code change, I would keep delivering the
> pageviews aggregation without need to wait for 24 hours of new data.
> Sometimes I'm just changing the submission parameters, like number of
> executors, memory and cores.
>
> Many thanks,
>
> Ricardo
>
> --
> Ricardo Paiva
> Big Data / Semântica
> *globo.com* 
>


Hbase Lookup

2015-09-02 Thread ayan guha
Hello group

I am trying to use pig or spark in order to achieve following:

1. Write a batch process which will read from a file
2. Lookup hbase to see if the record exists. If so then need to compare
incoming values with hbase and update fields which do not match. Else
create a new record.

My questions:
1. Is this a good use case for pig or spark?
2. Is there any way to read hbase for each incoming record in pig without
writing map reduce code?
3. In case of spark I think we have to connect to hbase for every record.
Is thr any other way?
4. What is the best connector for hbase which gives this functionality?

Best
Ayan


Re: Spark DataFrame saveAsTable with partitionBy creates no ORC file in HDFS

2015-09-02 Thread Michael Armbrust
Before Spark 1.5, tables created using saveAsTable cannot be queried by
Hive because we only store Spark SQL metadata.  In Spark 1.5 for parquet
and ORC we store both, but this will not work with partitioned tables
because hive does not support dynamic partition discovery.

On Wed, Sep 2, 2015 at 1:34 PM, unk1102  wrote:

> Hi I have a Spark dataframe which I want to save as hive table with
> partitions. I tried the following two statements but they dont work I dont
> see any ORC files in HDFS directory its empty. I can see baseTable is there
> in Hive console but obviously its empty because of no files inside HDFS.
> The
> following two lines saveAsTable() and insertInto()do not work.
> registerDataFrameAsTable() method works but it creates in memory table and
> causing OOM in my use case as I have thousands of hive partitions to
> prcoess. Please guide I am new to Spark. Thanks in advance.
>
>
> dataFrame.write().mode(SaveMode.Append).partitionBy("entity","date").format("orc").saveAsTable("baseTable");
>
>
> dataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity","date").insertInto("baseTable");
>
> //the following works but creates in memory table and seems to be reason
> for
> OOM in my case
>
> hiveContext.registerDataFrameAsTable(dataFrame, "baseTable");
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrame-saveAsTable-with-partitionBy-creates-no-ORC-file-in-HDFS-tp24562.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark 1.4.1 saveAsTextFile is slow on emr-4.0.0

2015-09-02 Thread Alexander Pivovarov
Hi Neil

Yes! it helps!!! I do  not see _temporary in console output anymore.
saveAsTextFile
is fast now.

2015-09-02 23:07:00,022 INFO  [task-result-getter-0]
scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 18.0
in stage 0.0 (TID 18) in 4398 ms on ip-10-0-24-103.ec2.internal (1/24)

2015-09-02 23:07:01,887 INFO  [task-result-getter-2]
scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 5.0 in
stage 0.0 (TID 5) in 6282 ms on ip-10-0-26-14.ec2.internal (24/24)

2015-09-02 23:07:01,888 INFO  [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 0
(saveAsTextFile at :22) finished in 6.319 s

2015-09-02 23:07:02,123 INFO  [main] s3n.Jets3tNativeFileSystemStore
(Jets3tNativeFileSystemStore.java:storeFile(141)) - s3.putObject foo-bar
tmp/test40_141_24_406/_SUCCESS 0


Thank you!

On Wed, Sep 2, 2015 at 12:54 AM, Neil Jonkers  wrote:

> Hi,
>
> Can you set the following parameters in your mapred-site.xml file please:
>
>
> mapred.output.direct.EmrFileSystemtrue
>
> mapred.output.direct.NativeS3FileSystemtrue
>
> You can also config this at cluster launch time with the following
> Classification via EMR console:
>
>
> classification=mapred-site,properties=[mapred.output.direct.EmrFileSystem=true,mapred.output.direct.NativeS3FileSystem=true]
>
>
> Thank you
>
> On Wed, Sep 2, 2015 at 6:02 AM, Alexander Pivovarov 
> wrote:
>
>> I checked previous emr config (emr-3.8)
>> mapred-site.xml has the following setting
>> 
>> mapred.output.committer.classorg.apache.hadoop.mapred.DirectFileOutputCommitter
>> 
>>
>>
>> On Tue, Sep 1, 2015 at 7:33 PM, Alexander Pivovarov > > wrote:
>>
>>> Should I use DirectOutputCommitter?
>>> spark.hadoop.mapred.output.committer.class
>>>  com.appsflyer.spark.DirectOutputCommitter
>>>
>>>
>>>
>>> On Tue, Sep 1, 2015 at 4:01 PM, Alexander Pivovarov <
>>> apivova...@gmail.com> wrote:
>>>
 I run spark 1.4.1 in amazom aws emr 4.0.0

 For some reason spark saveAsTextFile is very slow on emr 4.0.0 in
 comparison to emr 3.8  (was 5 sec, now 95 sec)

 Actually saveAsTextFile says that it's done in 4.356 sec but after that
 I see lots of INFO messages with 404 error from com.amazonaws.latency
 logger for next 90 sec

 spark> sc.parallelize(List.range(0, 160),160).map(x => x + "\t" +
 "A"*100).saveAsTextFile("s3n://foo-bar/tmp/test40_20")

 2015-09-01 21:16:17,637 INFO  [dag-scheduler-event-loop]
 scheduler.DAGScheduler (Logging.scala:logInfo(59)) - ResultStage 5
 (saveAsTextFile at :22) finished in 4.356 s
 2015-09-01 21:16:17,637 INFO  [task-result-getter-2]
 cluster.YarnScheduler (Logging.scala:logInfo(59)) - Removed TaskSet 5.0,
 whose tasks have all completed, from pool
 2015-09-01 21:16:17,637 INFO  [main] scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Job 5 finished: saveAsTextFile at
 :22, took 4.547829 s
 2015-09-01 21:16:17,638 INFO  [main] s3n.S3NativeFileSystem
 (S3NativeFileSystem.java:listStatus(896)) - listStatus
 s3n://foo-bar/tmp/test40_20/_temporary/0 with recursive false
 2015-09-01 21:16:17,651 INFO  [main] amazonaws.latency
 (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
 Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
 (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
 ID: 3B2F06FD11682D22), S3 Extended Request ID:
 C8T3rXVSEIk3swlwkUWJJX3gWuQx3QKC3Yyfxuhs7y0HXn3sEI9+c1a0f7/QK8BZ],
 ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found],
 AWSRequestID=[3B2F06FD11682D22], ServiceEndpoint=[
 https://foo-bar.s3.amazonaws.com], Exception=1,
 HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
 HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.923],
 HttpRequestTime=[11.388], HttpClientReceiveResponseTime=[9.544],
 RequestSigningTime=[0.274], HttpClientSendRequestTime=[0.129],
 2015-09-01 21:16:17,723 INFO  [main] amazonaws.latency
 (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[200],
 ServiceName=[Amazon S3], AWSRequestID=[E5D513E52B20FF17], ServiceEndpoint=[
 https://foo-bar.s3.amazonaws.com], HttpClientPoolLeasedCount=0,
 RequestCount=1, HttpClientPoolPendingCount=0,
 HttpClientPoolAvailableCount=1, ClientExecuteTime=[71.927],
 HttpRequestTime=[53.517], HttpClientReceiveResponseTime=[51.81],
 RequestSigningTime=[0.209], ResponseProcessingTime=[17.97],
 HttpClientSendRequestTime=[0.089],
 2015-09-01 21:16:17,756 INFO  [main] amazonaws.latency
 (AWSRequestMetricsFullSupport.java:log(203)) - StatusCode=[404],
 Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
 (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
 ID: 62C6B413965447FD), S3 Extended Request ID:
 

How to Serialize and Reconstruct JavaRDD later?

2015-09-02 Thread Raja Reddy
Hi All,

*Context:*
I am exploring topic modelling with LDA with Spark MLLib. However, I need
my model to enhance as more batches of documents come in.

As of now I see no way of doing something like this, which gensim
 does:

lda.update(other_corpus)

The only way I can enhance my model is essentially to recompute the
LDAModel over all the documents accumulated after a new batch arrives.

*Question:*
One of the time consuming steps before performing topic modelling would be
to construct the corpus as JavaRDD object, while reading through the actual
documents.

Capability to serialize a JavaRDD instance and reconstructing JavaRDD from
the serialized snapshot would be helpful in this case. Suppose say I
construct and serialize JavaRDD after reading Batch-1 of documents. When
the Batch-2 arrives, I would like to deserialize the previously serialized
RDD and mutate it with contents of new batch of documents. Could someone
please let me know if serialization and deserialization of a JavaRDD
instance is possible? I will have more questions if serialization is
possible, mostly to do with changing spark configuration in between a
serialization operation and deserialization operation.

Thanks and Regards,
Raja.


Re: Too many open files issue

2015-09-02 Thread Saisai Shao
Here is the code in which NewHadoopRDD register close handler and be called
when the task is completed (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136
).

>From my understanding, possibly the reason is that this `foreach` code in
your implementation is not executed Spark job one by one in loop as
expected, on the contrary all the jobs are submitted to the DAGScheduler
simultaneously, since each job has no dependency to others, Spark's
scheduler will unwrap the loop and submit jobs in parallelism, so maybe
several map stages are running and pending, this makes your node out of
file handler.

You could check Spark web portal to see if there's several map stages
running simultaneously, or some of them are running while others are
pending.

Thanks
Jerry


On Wed, Sep 2, 2015 at 9:09 PM, Sigurd Knippenberg 
wrote:

> Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K
> the issue goes away. It still doesn't make sense to me that the Spark job
> doesn't release its file handles until the end of the job instead of doing
> that while my loop iterates.
>
> Sigurd
>
> On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
> wrote:
>
>>
>> On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
>> wrote:
>>
>> I know I can adjust the max open files allowed by the OS but I'd rather
>> fix the underlaying issue.
>>
>>
>>
>> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>>
>> https://wiki.apache.org/hadoop/TooManyOpenFiles
>>
>
>


Re: Problems with Tungsten in Spark 1.5.0-rc2

2015-09-02 Thread Anders Arpteg
I haven't done a comparative benchmarking between the two, and it would
involve some work to do so. A single run with each suffler would probably
not say that much since we have a rather busy cluster and the performance
heavily depends on what's currently running in the cluster. I have seen
less problems/exceptions though, and possibilities to decrease memory
requirements (or increase cores), which is of great help.

BTW, is it possible (or will it be) to use Tungsten with dynamic allocation
and the external shuffle manager?

Best,
Anders

On Tue, Sep 1, 2015 at 7:07 PM Davies Liu  wrote:

> Thanks for the confirmation. The tungsten-sort is not the default
> ShuffleManager, this fix will not block 1.5 release, it may be in
> 1.5.1.
>
> BTW, How is the difference between sort and tungsten-sort
> ShuffleManager for this large job?
>
> On Tue, Sep 1, 2015 at 8:03 AM, Anders Arpteg  wrote:
> > A fix submitted less than one hour after my mail, very impressive Davies!
> > I've compiled your PR and tested it with the large job that failed
> before,
> > and it seems to work fine now without any exceptions. Awesome, thanks!
> >
> > Best,
> > Anders
> >
> > On Tue, Sep 1, 2015 at 1:38 AM Davies Liu  wrote:
> >>
> >> I had sent out a PR [1] to fix 2), could you help to test that?
> >>
> >> [1]  https://github.com/apache/spark/pull/8543
> >>
> >> On Mon, Aug 31, 2015 at 12:34 PM, Anders Arpteg 
> >> wrote:
> >> > Was trying out 1.5 rc2 and noticed some issues with the Tungsten
> shuffle
> >> > manager. One problem was when using the com.databricks.spark.avro
> reader
> >> > and
> >> > the error(1) was received, see stack trace below. The problem does not
> >> > occur
> >> > with the "sort" shuffle manager.
> >> >
> >> > Another problem was in a large complex job with lots of
> transformations
> >> > occurring simultaneously, i.e. 50+ or more maps each shuffling data.
> >> > Received error(2) about inability to acquire memory which seems to
> also
> >> > have
> >> > to do with Tungsten. Possibly some setting available to increase that
> >> > memory, because there's lots of heap memory available.
> >> >
> >> > Am running on Yarn 2.2 with about 400 executors. Hoping this will give
> >> > some
> >> > hints for improving the upcoming release, or for me to get some hints
> to
> >> > fix
> >> > the problems.
> >> >
> >> > Thanks,
> >> > Anders
> >> >
> >> > Error(1)
> >> >
> >> > 15/08/31 18:30:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
> >> > 3387,
> >> > lon4-hadoopslave-c245.lon4.spotify.net): java.io.EOFException
> >> >
> >> >at java.io.DataInputStream.readInt(DataInputStream.java:392)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:121)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:109)
> >> >
> >> >at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
> >> >
> >> >at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
> >> >
> >> >at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:366)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
> 1.org$apache$spark$sql$execution$aggregate$Tung
> >> >
> >> >
> stenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
> >> >
> >> >at
> >> >
> >> >
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:47)
> >> >
> >> >at
> >> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> >> >
> >> >at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> >> >
> >> >at
> >> >
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> >> >
> >> >at
> >> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> >> >
> >> >at 

Multiple spark-submits vs akka-actors

2015-09-02 Thread srungarapu vamsi
Hi,

I am using a mesos cluster to run my spark jobs.
I have one mesos-master and two mesos-slaves setup on 2 machines.
On one machine, master and slave are setup and on the second machine
mesos-slave is setup
I run these on  m3-large ec2 instances.

1. When i try to submit two jobs using spark-submit in parallel, one job
hangs with the message : "Initial job has not accepted any resources; check
your cluster UI to ensure that workers are registered and have sufficient
resources". But when i check on the mesos cluster UI which runs at 5050
port, i can see idle memory which can be used by the hanging job. But
number of idle cores is 1.
So, does this mean that cores are pinned to spark-submit and no other
spark-submit can get the core till the running spark-submit completes ?

2. Assumption : "submitting multiple spark-jobs using spark-submit has the
above mentioned problem ".
Now my task is to run a spark-streaming job which reads from kafka and does
some precomputation.
The nature of my pre-computation jobs are in such a way that, each
pre-compute jobs has few mutually exclusive tasks to complete where all the
tasks have inherent tree structure in them. i.e A task initiates few other
tasks and they initiate further more tasks.
I already have spark jobs which run as a batch job to perform the
pre-computations  mentioned above. Now, is it a good idea to convert these
precompuations jobs into akka actors ?

3. If at all running multiple spark-submit jobs with shared CPU is
possible, for the scenario explained in Point.2, which approach is better :
"precomputation jobs as actors" vs "multiple spark-submits" ?

Any pointers to clear my above doubts is highly appreciated.
-- 
/Vamsi


Re: Multiple spark-submits vs akka-actors

2015-09-02 Thread Akhil Das
"Initial job has not accepted any resources; check your cluster UI to
ensure that workers are registered and have sufficient resources".

I'm assuming you are submitting the job in coarse-grained mode, in that
case make sure you are asking for the available resources.

If you want to submit multiple applications and run them side-by-side then
you can submit the application in fine-grained mode.

Read more over here
http://spark.apache.org/docs/latest/running-on-mesos.html#mesos-run-modes

Thanks
Best Regards

On Wed, Sep 2, 2015 at 4:02 PM, srungarapu vamsi 
wrote:

> Hi,
>
> I am using a mesos cluster to run my spark jobs.
> I have one mesos-master and two mesos-slaves setup on 2 machines.
> On one machine, master and slave are setup and on the second machine
> mesos-slave is setup
> I run these on  m3-large ec2 instances.
>
> 1. When i try to submit two jobs using spark-submit in parallel, one job
> hangs with the message : "Initial job has not accepted any resources; check
> your cluster UI to ensure that workers are registered and have sufficient
> resources". But when i check on the mesos cluster UI which runs at 5050
> port, i can see idle memory which can be used by the hanging job. But
> number of idle cores is 1.
> So, does this mean that cores are pinned to spark-submit and no other
> spark-submit can get the core till the running spark-submit completes ?
>
> 2. Assumption : "submitting multiple spark-jobs using spark-submit has the
> above mentioned problem ".
> Now my task is to run a spark-streaming job which reads from kafka and
> does some precomputation.
> The nature of my pre-computation jobs are in such a way that, each
> pre-compute jobs has few mutually exclusive tasks to complete where all the
> tasks have inherent tree structure in them. i.e A task initiates few other
> tasks and they initiate further more tasks.
> I already have spark jobs which run as a batch job to perform the
> pre-computations  mentioned above. Now, is it a good idea to convert these
> precompuations jobs into akka actors ?
>
> 3. If at all running multiple spark-submit jobs with shared CPU is
> possible, for the scenario explained in Point.2, which approach is better :
> "precomputation jobs as actors" vs "multiple spark-submits" ?
>
> Any pointers to clear my above doubts is highly appreciated.
> --
> /Vamsi
>


Re: Memory-efficient successive calls to repartition()

2015-09-02 Thread alexis GILLAIN
Aurélien,

>From what you're saying, I can think of a couple of things considering I
don't know what you are doing in the rest of the code :

- There is lot of non hdfs writes, it comes from the rest of your code
and/or repartittion(). Repartition involve a shuffling and creation of
files on disk. I would have said that the problem come from that but I just
checked and checkpoint() is supposed to delete shuffle files :
https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
(looks exactly as your problem so you could maybe try the others
workarounds)
Still, you may do a lot of shuffle in the rest of the code (you should see
the amount of shuffle files written in the webui) and consider increasing
the disk space available...if you can do that.

- On the hdfs side, the class I pointed to has an update function
which "automatically
handles persisting and (optionally) checkpointing, as well as unpersisting
and removing checkpoint files". Not sure your method for checkpointing
remove previous checkpoint file.

In the end, does the disk space error come from hdfs growing or local disk
growing ?

You should check the webui to identify which tasks spill data on disk and
verify if the shuffle files are properly deleted when you checkpoint your
rdd.


Regards,


2015-09-01 22:48 GMT+08:00 Aurélien Bellet <
aurelien.bel...@telecom-paristech.fr>:

> Dear Alexis,
>
> Thanks again for your reply. After reading about checkpointing I have
> modified my sample code as follows:
>
> for i in range(1000):
> print i
> data2=data.repartition(50).cache()
> if (i+1) % 10 == 0:
> data2.checkpoint()
> data2.first() # materialize rdd
> data.unpersist() # unpersist previous version
> data=data2
>
> The data is checkpointed every 10 iterations to a directory that I
> specified. While this seems to improve things a little bit, there is still
> a lot of writing on disk (appcache directory, shown as "non HDFS files" in
> Cloudera Manager) *besides* the checkpoint files (which are regular HDFS
> files), and the application eventually runs out of disk space. The same is
> true even if I checkpoint at every iteration.
>
> What am I doing wrong? Maybe some garbage collector setting?
>
> Thanks a lot for the help,
>
> Aurelien
>
> Le 24/08/2015 10:39, alexis GILLAIN a écrit :
>
>> Hi Aurelien,
>>
>> The first code should create a new RDD in memory at each iteration
>> (check the webui).
>> The second code will unpersist the RDD but that's not the main problem.
>>
>> I think you have trouble due to long lineage as .cache() keep track of
>> lineage for recovery.
>> You should have a look at checkpointing :
>>
>> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>>
>> You can also have a look at the code of others iterative algorithms in
>> mlllib for best practices.
>>
>> 2015-08-20 17:26 GMT+08:00 abellet > >:
>>
>> Hello,
>>
>> For the need of my application, I need to periodically "shuffle" the
>> data
>> across nodes/partitions of a reasonably-large dataset. This is an
>> expensive
>> operation but I only need to do it every now and then. However it
>> seems that
>> I am doing something wrong because as the iterations go the memory
>> usage
>> increases, causing the job to spill onto HDFS, which eventually gets
>> full. I
>> am also getting some "Lost executor" errors that I don't get if I
>> don't
>> repartition.
>>
>> Here's a basic piece of code which reproduces the problem:
>>
>> data =
>> sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>> data.count()
>> for i in range(1000):
>>  data=data.repartition(50).persist()
>>  # below several operations are done on data
>>
>>
>> What am I doing wrong? I tried the following but it doesn't solve
>> the issue:
>>
>> for i in range(1000):
>>  data2=data.repartition(50).persist()
>>  data2.count() # materialize rdd
>>  data.unpersist() # unpersist previous version
>>  data=data2
>>
>>
>> Help and suggestions on this would be greatly appreciated! Thanks a
>> lot!
>>
>>
>>
>>
>> --
>> View this message in context:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 

Re: Too many open files issue

2015-09-02 Thread Steve Loughran
ah, now that does sound suspicious...

On 2 Sep 2015, at 14:09, Sigurd Knippenberg 
> wrote:

Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K the 
issue goes away. It still doesn't make sense to me that the Spark job doesn't 
release its file handles until the end of the job instead of doing that while 
my loop iterates.

Sigurd

On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
> wrote:

On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
> wrote:

I know I can adjust the max open files allowed by the OS but I'd rather fix the 
underlaying issue.


bumping up the OS handle limits is step #1 of installing a hadoop cluster

https://wiki.apache.org/hadoop/TooManyOpenFiles




Re: How to Serialize and Reconstruct JavaRDD later?

2015-09-02 Thread Hemant Bhanawat
You want to persist the state between the execution of two rdds. So, I
believe what you need is serialization of your model and not JavaRDD. If
you can serialize your model, you can persist that in HDFS or some other
datastore to be used by the next RDDs.

If you are using Spark Streaming, doing this would be easy.

On Wed, Sep 2, 2015 at 4:54 PM, Raja Reddy  wrote:

> Hi All,
>
> *Context:*
> I am exploring topic modelling with LDA with Spark MLLib. However, I need
> my model to enhance as more batches of documents come in.
>
> As of now I see no way of doing something like this, which gensim
>  does:
>
> lda.update(other_corpus)
>
> The only way I can enhance my model is essentially to recompute the
> LDAModel over all the documents accumulated after a new batch arrives.
>
> *Question:*
> One of the time consuming steps before performing topic modelling would be
> to construct the corpus as JavaRDD object, while reading through the actual
> documents.
>
> Capability to serialize a JavaRDD instance and reconstructing JavaRDD from
> the serialized snapshot would be helpful in this case. Suppose say I
> construct and serialize JavaRDD after reading Batch-1 of documents. When
> the Batch-2 arrives, I would like to deserialize the previously serialized
> RDD and mutate it with contents of new batch of documents. Could someone
> please let me know if serialization and deserialization of a JavaRDD
> instance is possible? I will have more questions if serialization is
> possible, mostly to do with changing spark configuration in between a
> serialization operation and deserialization operation.
>
> Thanks and Regards,
> Raja.
>


Re: Question about Google Books Ngrams with pyspark (1.4.1)

2015-09-02 Thread Bertrand
Looking at another forum, 

I tried :

files =
sc.newAPIHadoopFile("s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram","com.hadoop.mapreduce.LzoTextInputFormat","org.apache.hadoop.io.LongWritable","org.apache.hadoop.io.Text")
Traceback (most recent call last):
  File "", line 1, in 
  File "/root/spark/python/pyspark/context.py", line 574, in
newAPIHadoopFile
jconf, batchSize)
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
: java.lang.ClassNotFoundException: com.hadoop.mapreduce.LzoTextInputFormat
v

Thanks for your help,

Bertrand



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Question-about-Google-Books-Ngrams-with-pyspark-1-4-1-tp24542p24556.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Memory-efficient successive calls to repartition()

2015-09-02 Thread Aurélien Bellet

Thanks a lot for the useful link and comments Alexis!

First of all, the problem occurs without doing anything else in the code 
(except of course loading my data from HDFS at the beginning) - so it 
definitely comes from the shuffling. You're right, in the current 
version, checkpoint files are not removed and take up some space in HDFS 
(this is easy to fix). But this is negligible compared to the non hdfs 
files which keeps growing as iterations go. So I agree with you that 
this must come from the shuffling operations: it seems that the shuffle 
files are not removed along the execution (they are only removed if I 
stop/kill the application), despite the use of checkpoint.


The class you mentioned is very interesting but I did not find a way to 
use it from pyspark. I will try to implement my own version, looking at 
the source code. But besides the queueing and removing of checkpoint 
files, I do not really see anything special there that could solve my issue.


I will continue to investigate this. Just found out I can use a command 
line browser to look at the webui (I cannot access the server in 
graphical display mode), this should help me understand what's going on. 
I will also try the workarounds mentioned in the link. Keep you posted.


Again, thanks a lot!

Best,

Aurelien

Le 02/09/2015 14:15, alexis GILLAIN a écrit :

Aurélien,

 From what you're saying, I can think of a couple of things considering
I don't know what you are doing in the rest of the code :

- There is lot of non hdfs writes, it comes from the rest of your code
and/or repartittion(). Repartition involve a shuffling and creation of
files on disk. I would have said that the problem come from that but I
just checked and checkpoint() is supposed to delete shuffle files :
https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
(looks exactly as your problem so you could maybe try the others
workarounds)
Still, you may do a lot of shuffle in the rest of the code (you should
see the amount of shuffle files written in the webui) and consider
increasing the disk space available...if you can do that.

- On the hdfs side, the class I pointed to has an update function which
"automatically handles persisting and (optionally) checkpointing, as
well as unpersisting and removing checkpoint files". Not sure your
method for checkpointing remove previous checkpoint file.

In the end, does the disk space error come from hdfs growing or local
disk growing ?

You should check the webui to identify which tasks spill data on disk
and verify if the shuffle files are properly deleted when you checkpoint
your rdd.


Regards,


2015-09-01 22:48 GMT+08:00 Aurélien Bellet
>:

Dear Alexis,

Thanks again for your reply. After reading about checkpointing I
have modified my sample code as follows:

for i in range(1000):
 print i
 data2=data.repartition(50).cache()
 if (i+1) % 10 == 0:
 data2.checkpoint()
 data2.first() # materialize rdd
 data.unpersist() # unpersist previous version
 data=data2

The data is checkpointed every 10 iterations to a directory that I
specified. While this seems to improve things a little bit, there is
still a lot of writing on disk (appcache directory, shown as "non
HDFS files" in Cloudera Manager) *besides* the checkpoint files
(which are regular HDFS files), and the application eventually runs
out of disk space. The same is true even if I checkpoint at every
iteration.

What am I doing wrong? Maybe some garbage collector setting?

Thanks a lot for the help,

Aurelien

Le 24/08/2015 10:39, alexis GILLAIN a écrit :

Hi Aurelien,

The first code should create a new RDD in memory at each iteration
(check the webui).
The second code will unpersist the RDD but that's not the main
problem.

I think you have trouble due to long lineage as .cache() keep
track of
lineage for recovery.
You should have a look at checkpointing :

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

You can also have a look at the code of others iterative
algorithms in
mlllib for best practices.

2015-08-20 17:26 GMT+08:00 abellet

>>:

 Hello,

 For the need of my application, I need to periodically
"shuffle" the
 data
 across nodes/partitions of a reasonably-large dataset. 

Unable to understand error “SparkListenerBus has already stopped! Dropping event …”

2015-09-02 Thread Adrien Mogenet
Hi there,

I'd like to know if anyone has a magic method to avoid such messages in
Spark logs:

2015-08-30 19:30:44 ERROR LiveListenerBus:75 - SparkListenerBus has already
stopped! Dropping event
SparkListenerExecutorMetricsUpdate(41,WrappedArray())

After further investigations, I understand that LiveListenerBus extends
AsynchronousListenerBus. And thus, at some point, .stop() method is called.
Then, messages that might be sent/received will be dropped and remain
unprocessed. Basically, some SparkListenerExecutorMetricsUpdate messages
are unfortunately not received yet, and once they are, they become dropped
to nowhere.

This doesn't look critical since SparkListenerExecutorMetricsUpdate only
corresponds to Periodic updates from executors.

What is embarrassing is that I absolutely don't understand why this happens
and nothings refers to this issue. Note that this is totally
non-deterministic and I can't reproduce this, probably due to the
asynchronous nature and my lack of understand on how/when stop() is
supposed to be called.

Any idea?

Best,

-- 

*Adrien Mogenet*
Head of Backend/Infrastructure
adrien.moge...@contentsquare.com
(+33)6.59.16.64.22
http://www.contentsquare.com
50, avenue Montaigne - 75008 Paris


Re: Too many open files issue

2015-09-02 Thread Sigurd Knippenberg
Yep. I know. It's was set to 32K when I ran this test. If I bump it to 64K
the issue goes away. It still doesn't make sense to me that the Spark job
doesn't release its file handles until the end of the job instead of doing
that while my loop iterates.

Sigurd

On Wed, Sep 2, 2015 at 4:33 AM, Steve Loughran 
wrote:

>
> On 31 Aug 2015, at 19:49, Sigurd Knippenberg 
> wrote:
>
> I know I can adjust the max open files allowed by the OS but I'd rather
> fix the underlaying issue.
>
>
>
> bumping up the OS handle limits is step #1 of installing a hadoop cluster
>
> https://wiki.apache.org/hadoop/TooManyOpenFiles
>


Error using SQLContext in spark

2015-09-02 Thread rakesh sharma
Error: application failed with exceptionjava.lang.NoSuchMethodError: 
org.apache.spark.sql.SQLContext.(Lorg/apache/spark/api/java/JavaSparkContext;)V
at 
examples.PersonRecordReader.getPersonRecords(PersonRecordReader.java:35)
at examples.PersonRecordReader.main(PersonRecordReader.java:17)at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)at 
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:367)at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:77)at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Hi All
I am getting the above exception when I am using SQLContext in spark jobs.The 
error occurs only with the insertion of these statements. The rdd is fine and 
it prints all correctly.The error occurs when creating dataframes. I am using 
maven dependencies version 1.3.1
public static void getPersonRecords(String...args) {SparkConf 
sparkConf = new SparkConf().setAppName("SQLContext"); JavaSparkContext 
javaSparkContext = new JavaSparkContext(sparkConf);JavaRDD 
lines = javaSparkContext.textFile(args[0], 1);  JavaRDD 
personRecords = lines.map(new Function() {
public Person call(String line) throws Exception {  
System.out.println(line);   
String[] rec = line.split(","); return new 
Person(Integer.parseInt(rec[1].trim()), rec[0]); }  
 }); for(Person p : personRecords.collect())  { 
 System.out.println(p.getName());}   SQLContext 
sqlContext = new SQLContext(javaSparkContext);   DataFrame 
dataFrame = sqlContext.createDataFrame(personRecords, Person.class);  }

Please help me stuck with this since morning
thanksrakesh