Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-12 Thread Cheng Lian

Hi Philip,

What do you mean by saying "still partitioned the same way"? If you are 
trying to to save the partition columns encoded in partition directories 
directly into Parquet files, and put all Parquet part-files into a 
single directory without creating any intermediate sub-directories, then 
I'd expect it to be much faster. This is at least true for file systems 
like S3, haven't tested listing contents of super wide and flat 
directories (i.e. those containing no sub-directories but a lot of files).


And, as Hao suggested, sorting columns in which you are interested can 
give better performance on read path because of Parquet specific 
optimizations like filter push-down. However, I think in your case, the 
time spent in reading Parquet files is not the bottleneck, according to 
our previous discussion.


Cheng

On 8/12/15 8:56 AM, Cheng, Hao wrote:


Definitely worth to try. And you can sort the record before writing 
out, and then you will get the parquet files without overlapping keys.


Let us know if that helps.

Hao

*From:*Philip Weaver [mailto:philip.wea...@gmail.com]
*Sent:* Wednesday, August 12, 2015 4:05 AM
*To:* Cheng Lian
*Cc:* user
*Subject:* Re: Very high latency to initialize a DataFrame from 
partitioned parquet database.


Do you think it might be faster to put all the files in one directory 
but still partitioned the same way? I don't actually need to filter on 
the values of the partition keys, but I need to rely on there be no 
overlap in the value of the keys between any two parquet files.


On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver > wrote:


Thanks, I also confirmed that the partition discovery is slow by
writing a non-Spark application that uses the parquet library
directly to load that partitions.

It's so slow that my colleague's Python application can read the
entire contents of all the parquet data files faster than my
application can even discover the partitions!

On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian mailto:lian.cs@gmail.com>> wrote:

However, it's weird that the partition discovery job only
spawns 2 tasks. It should use the default parallelism, which
is probably 8 according to the logs of the next Parquet
reading job. Partition discovery is already done in a
distributed manner via a Spark job. But the parallelism is
mysteriously low...

Cheng

On 8/7/15 3:32 PM, Cheng Lian wrote:

Hi Philip,

Thanks for providing the log file. It seems that most of
the time are spent on partition discovery. The code
snippet you provided actually issues two jobs. The first
one is for listing the input directories to find out all
leaf directories (and this actually requires listing all
leaf files, because we can only assert that a directory is
a leaf one when it contains no sub-directories). Then
partition information is extracted from leaf directory
paths. This process starts at:

10:51:44 INFO sources.HadoopFsRelation: Listing leaf
files and directories in parallel under:
file:/home/pweaver/work/parquet/day=20150225, …

and ends at:

10:52:31 INFO scheduler.TaskSchedulerImpl: Removed
TaskSet 0.0, whose tasks have all completed, from pool

The actual tasks execution time is about 36s:

10:51:54 INFO scheduler.TaskSetManager: Starting task
0.0 in stage 0.0 (TID 0, lindevspark5, PROCESS_LOCAL,
3087 bytes)
…
10:52:30 INFO scheduler.TaskSetManager: Finished task
0.0 in stage 0.0 (TID 0) in 36107 ms on lindevspark5 (1/2)

You mentioned that your dataset has about 40,000+
partitions, so there are a lot of leaf directories and
files out there. My guess is that the local file system
spent lots of time listing FileStatus-es of all these files.

I also noticed that Mesos job scheduling takes more time
then expected. It is probably because this is the first
Spark job executed in the application, and the system is
not warmed up yet. For example, there’s a 6s gap between
these two adjacent lines:

10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task
set 0.0 with 2 tasks
10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos
task 0 is now TASK_RUNNING

The 2nd Spark job is the real Parquet reading job, and
this one actually finishes pretty quickly, only 3s (note
that the Mesos job scheduling latency is also included):

10:52:32 INFO scheduler.DAGScheduler: Got job 1
(parquet at App.scala:182) with 8 output par

Re: Parquet without hadoop: Possible?

2015-08-12 Thread Cheng Lian
One thing to note is that, it would be good to add explicit file system 
scheme to the output path (i.e. "file:///var/..." instead of 
"/var/..."), esp. when you do have HDFS running. Because in this case 
the data might be written to HDFS rather than your local file system if 
Spark found Hadoop configuration files when starting the application.


Cheng

On 8/11/15 11:12 PM, saif.a.ell...@wellsfargo.com wrote:


I confirm that it works,

I was just having this issue: 
https://issues.apache.org/jira/browse/SPARK-8450


Saif

*From:*Ellafi, Saif A.
*Sent:* Tuesday, August 11, 2015 12:01 PM
*To:* Ellafi, Saif A.; deanwamp...@gmail.com
*Cc:* user@spark.apache.org
*Subject:* RE: Parquet without hadoop: Possible?

Sorry, I provided bad information. This example worked fine with 
reduced parallelism.


It seems my problem have to do with something specific with the real 
data frame at reading point.


Saif

*From:*saif.a.ell...@wellsfargo.com 
 
[mailto:saif.a.ell...@wellsfargo.com]

*Sent:* Tuesday, August 11, 2015 11:49 AM
*To:* deanwamp...@gmail.com 
*Cc:* user@spark.apache.org 
*Subject:* RE: Parquet without hadoop: Possible?

I am launching my spark-shell

spark-1.4.1-bin-hadoop2.6/bin/spark-shell

15/08/11 09:43:32 INFO SparkILoop: Created sql context (with Hive 
support)..


SQL context available as sqlContext.

scala> val data = sc.parallelize(Array(2,3,5,7,2,3,6,1)).toDF

scala> data.write.parquet("/var/ data/Saif/pq")

Then I get a million errors:

15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]

15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]

15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]

15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.

java.lang.OutOfMemoryError: Java heap space

at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)


at 
parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)


at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:68)


at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:48)


at 
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)


at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)


at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)


at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:178)


at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)

at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)


at 
parquet.hadoop.InternalParquetRecordWriter.(InternalParquetRecordWriter.java:94)


at parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:64)

at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)


at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)


at 
org.apache.spark.sql.parquet.ParquetOutputWriter.(newParquet.scala:83)


at 
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)


at 
org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470)


at 
org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360)


at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172)


at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)


at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)


at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)

at org.apache.spark.scheduler.Task.run(Task.scala:70)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at 
java.util.concurrent

Re: Spark DataFrames uses too many partition

2015-08-12 Thread Alasdair McBride
Thanks Silvio!
On 11 Aug 2015 17:44, "Silvio Fiorito" 
wrote:

> You need to configure the spark.sql.shuffle.partitions parameter to a
> different value. It defaults to 200.
>
>
>
>
> On 8/11/15, 11:31 AM, "Al M"  wrote:
>
> >I am using DataFrames with Spark 1.4.1.  I really like DataFrames but the
> >partitioning makes no sense to me.
> >
> >I am loading lots of very small files and joining them together.  Every
> file
> >is loaded by Spark with just one partition.  Each time I join two small
> >files the partition count increases to 200.  This makes my application
> take
> >10x as long as if I coalesce everything to 1 partition after each join.
> >
> >With normal RDDs it would not expand out the partitions to 200 after
> joining
> >two files with one partition each.  It would either keep it at one or
> expand
> >it to two.
> >
> >Why do DataFrames expand out the partitions so much?
> >
> >
> >
> >--
> >View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrames-uses-too-many-partition-tp24214.html
> >Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> >-
> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >For additional commands, e-mail: user-h...@spark.apache.org
> >
>


RE: Spark DataFrames uses too many partition

2015-08-12 Thread Alasdair McBride
Thank you Hao; that was a fantastic response. I have raised SPARK-9782 for
this.

I also would love to have dynamic partitioning. I mentioned it in the Jira.
On 12 Aug 2015 02:19, "Cheng, Hao"  wrote:

> That's a good question, we don't support reading small files in a single
> partition yet, but it's definitely an issue we need to optimize, do you
> mind to create a jira issue for this? Hopefully we can merge that in 1.6
> release.
>
> 200 is the default partition number for parallel tasks after the data
> shuffle, and we have to change that value according to the file size,
> cluster size etc..
>
> Ideally, this number would be set dynamically and automatically, however,
> spark sql doesn't support the complex cost based model yet, particularly
> for the multi-stages job. (
> https://issues.apache.org/jira/browse/SPARK-4630)
>
> Hao
>
> -Original Message-
> From: Al M [mailto:alasdair.mcbr...@gmail.com]
> Sent: Tuesday, August 11, 2015 11:31 PM
> To: user@spark.apache.org
> Subject: Spark DataFrames uses too many partition
>
> I am using DataFrames with Spark 1.4.1.  I really like DataFrames but the
> partitioning makes no sense to me.
>
> I am loading lots of very small files and joining them together.  Every
> file is loaded by Spark with just one partition.  Each time I join two
> small files the partition count increases to 200.  This makes my
> application take 10x as long as if I coalesce everything to 1 partition
> after each join.
>
> With normal RDDs it would not expand out the partitions to 200 after
> joining two files with one partition each.  It would either keep it at one
> or expand it to two.
>
> Why do DataFrames expand out the partitions so much?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrames-uses-too-many-partition-tp24214.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to minimize shuffling on Spark dataframe Join?

2015-08-12 Thread Abdullah Anwar
Hi Hemant,

Thank you for your replay.

I think source of my dataframe is not partitioned on key, its an avro
file where 'id' is a field .. but I don't know how to read a file and at
the same time configure partition key. I couldn't find  anything on
SQLContext.read.load where you can set partition key. or in dataframe where
you can set partition key. If it could partition the on the specified key
.. will spark put the same partition range on same machine for two
different dataframe??

   What are the overall tips to join faster?

Best Regards,
Abdullah




On Wed, Aug 12, 2015 at 11:02 AM, Hemant Bhanawat 
wrote:

> Is the source of your dataframe partitioned on key? As per your mail, it
> looks like it is not. If that is the case,  for partitioning the data, you
> will have to shuffle the data anyway.
>
> Another part of your question is - how to co-group data from two
> dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is
> a way. I am not sure if something similar is available for DataFrames.
>
> Hemant
>
>
>
>
>
> On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar <
> abdullah.ibn.an...@gmail.com> wrote:
>
>>
>>
>> I have two dataframes like this
>>
>>   student_rdf = (studentid, name, ...)
>>   student_result_rdf = (studentid, gpa, ...)
>>
>> we need to join this two dataframes. we are now doing like this,
>>
>> student_rdf.join(student_result_rdf, student_result_rdf["studentid"] == 
>> student_rdf["studentid"])
>>
>> So it is simple. But it creates lots of data shuffling across worker
>> nodes, but as joining key is similar and if the dataframe could (understand
>> the partitionkey) be partitioned using that key (studentid) then there
>> suppose not to be any shuffling at all. As similar data (based on partition
>> key) would reside in similar node. is it possible, to hint spark to do this?
>>
>> So, I am finding the way to partition data based on a column while I read
>> a dataframe from input. And If it is possible that Spark would understand
>> that two partitionkey of two dataframes are similar, then how?
>>
>>
>>
>>
>> --
>> Abdullah
>>
>
>


-- 
Abdullah


Re: Controlling number of executors on Mesos vs YARN

2015-08-12 Thread Tim Chen
Yes the options are not that configurable yet but I think it's not hard to
change it.

I have a patch out actually specifically able to configure amount of cpus
per executor in coarse grain mode, and hopefully merged next release.

I think the open question now is for fine grain mode can we limit the
number of maximum concurrent executors, and I think we can definitely just
add a new option like spark.mesos.executor.max to cap it.

I'll file a jira and hopefully to get this change in soon too.

Tim



On Tue, Aug 11, 2015 at 6:21 AM, Haripriya Ayyalasomayajula <
aharipriy...@gmail.com> wrote:

> Spark evolved as an example framework for Mesos - thats how I know it. It
> is surprising to see that the options provided by mesos in this case are
> less. Tweaking the source code, haven't done it yet but I would love to see
> what options could be there!
>
> On Tue, Aug 11, 2015 at 5:42 AM, Jerry Lam  wrote:
>
>> My experience with Mesos + Spark is not great. I saw one executor with 30
>> CPU and the other executor with 6. So I don't think you can easily
>> configure it without some tweaking at the source code.
>>
>> Sent from my iPad
>>
>> On 2015-08-11, at 2:38, Haripriya Ayyalasomayajula <
>> aharipriy...@gmail.com> wrote:
>>
>> Hi Tim,
>>
>> Spark on Yarn allows us to do it using --num-executors and
>> --executor_cores commandline arguments. I just got a chance to look at a
>> similar spark user list mail, but no answer yet. So does mesos allow
>> setting the number of executors and cores? Is there a default number it
>> assumes?
>>
>> On Mon, Jan 5, 2015 at 5:07 PM, Tim Chen  wrote:
>>
>>> Forgot to hit reply-all.
>>>
>>> -- Forwarded message --
>>> From: Tim Chen 
>>> Date: Sun, Jan 4, 2015 at 10:46 PM
>>> Subject: Re: Controlling number of executors on Mesos vs YARN
>>> To: mvle 
>>>
>>>
>>> Hi Mike,
>>>
>>> You're correct there is no such setting in for Mesos coarse grain mode,
>>> since the assumption is that each node is launched with one container and
>>> Spark is launching multiple tasks in that container.
>>>
>>> In fine-grain mode there isn't a setting like that, as it currently will
>>> launch an executor as long as it satisfies the minimum container resource
>>> requirement.
>>>
>>> I've created a JIRA earlier about capping the number of executors or
>>> better distribute the # of executors launched in each node. Since the
>>> decision of choosing what node to launch containers is all in the Spark
>>> scheduler side, it's very easy to modify it.
>>>
>>> Btw, what's the configuration to set the # of executors on YARN side?
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>>
>>>
>>> On Sun, Jan 4, 2015 at 9:37 PM, mvle  wrote:
>>>
 I'm trying to compare the performance of Spark running on Mesos vs YARN.
 However, I am having problems being able to configure the Spark
 workload to
 run in a similar way on Mesos and YARN.

 When running Spark on YARN, you can specify the number of executors per
 node. So if I have a node with 4 CPUs, I can specify 6 executors on that
 node. When running Spark on Mesos, there doesn't seem to be an
 equivalent
 way to specify this. In Mesos, you can somewhat force this by
 specifying the
 number of CPU resources to be 6 when running the slave daemon. However,
 this
 seems to be a static configuration of the Mesos cluster rather something
 that can be configured in the Spark framework.

 So here is my question:

 For Spark on Mesos, am I correct that there is no way to control the
 number
 of executors per node (assuming an idle cluster)? For Spark on Mesos
 coarse-grained mode, there is a way to specify max_cores but that is
 still
 not equivalent to specifying the number of executors per node as when
 Spark
 is run on YARN.

 If I am correct, then it seems Spark might be at a disadvantage running
 on
 Mesos compared to YARN (since it lacks the fine tuning ability provided
 by
 YARN).

 Thanks,
 Mike



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Controlling-number-of-executors-on-Mesos-vs-YARN-tp20966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com
 .

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>>
>>
>>
>> --
>> Regards,
>> Haripriya Ayyalasomayajula
>>
>>
>
>
> --
> Regards,
> Haripriya Ayyalasomayajula
>
>


How to Handle Update Operation from Spark to MongoDB

2015-08-12 Thread Deepesh Maheshwari
Hi,

I am using MongoDb -Hadoop connector to insert RDD into mongodb.

rdd.saveAsNewAPIHadoopFile("file:///notapplicable",
Object.class, BSONObject.class,
MongoOutputFormat.class, outputConfig);

But, some operation required to insert rdd data as update operation for
Mongo instead of above insert operation.

Please suggest how to accomplish this task.

Regards,
Deepesh


Re: stopping spark stream app

2015-08-12 Thread Shushant Arora
calling jssc.stop(false/true,false/true) from streamingListener causes
deadlock , So I created another thread and called jssc.stop from  that but
that too caused deadlock if onBatchCompleted is not completed before
jssc.stop().

So is it safe If I call System.exit(1) from another thread without calling
jssc.stop()- since that leads to deadlock.


On Tue, Aug 11, 2015 at 9:54 PM, Shushant Arora 
wrote:

> Is stopping in the streaming context in onBatchCompleted event
> of StreamingListener does not kill the app?
>
> I have below code in streaming listener
>
> public void onBatchCompleted(StreamingListenerBatchCompleted arg0) {
> //check stop condition
> System.out.println("stopping gracefully");
> jssc.stop(false,false);
> System.out.println("stopped gracefully");
> }
>
> stopped gracefully is never printed.
>
> On UI no more batches are processed but application is never
> killed/stopped? Whats the best way to kill the app.after stopping context?
>
>
>
> On Tue, Aug 11, 2015 at 2:55 AM, Shushant Arora  > wrote:
>
>> Thanks!
>>
>>
>>
>> On Tue, Aug 11, 2015 at 1:34 AM, Tathagata Das 
>> wrote:
>>
>>> 1. RPC can be done in many ways, and a web service is one of many ways.
>>> A even more hacky version can be the app polling a file in a file system,
>>> if the file exists start shutting down.
>>> 2. No need to set a flag. When you get the signal from RPC, you can just
>>> call context.stop(stopGracefully = true) . Though note that this is
>>> blocking, so gotta be carefully about doing blocking calls on the RPC
>>> thread.
>>>
>>> On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 By RPC you mean web service exposed on driver which listens and set
 some flag and driver checks that flag at end of each batch and if set then
 gracefully stop the application ?

 On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das 
 wrote:

> In general, it is a little risky to put long running stuff in a
> shutdown hook as it may delay shutdown of the process which may delay 
> other
> things. That said, you could try it out.
>
> A better way to explicitly shutdown gracefully is to use an RPC to
> signal the driver process to start shutting down, and then the process 
> will
> gracefully stop the context and terminate. This is more robust that than
> leveraging shutdown hooks.
>
> On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Any help in best recommendation for gracefully shutting down a spark
>> stream application ?
>> I am running it on yarn and a way to tell from externally either yarn
>> application -kill command or some other way but need current batch to be
>> processed completely and checkpoint to be saved before shutting down.
>>
>> Runtime.getRuntime().addShutdownHook does not seem to be working.
>> Yarn kills the application immediately and dooes not call shutdown hook
>> call back .
>>
>>
>> On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> How to ensure in spark streaming 1.3 with kafka that when an
>>> application is killed , last running batch is fully processed and 
>>> offsets
>>> are written to checkpointing dir.
>>>
>>> On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Hi

 I am using spark stream 1.3 and using custom checkpoint to save
 kafka offsets.

 1.Is doing
 Runtime.getRuntime().addShutdownHook(new Thread() {
   @Override
   public void run() {
   jssc.stop(true, true);
System.out.println("Inside Add Shutdown Hook");
   }
  });

 to handle stop is safe ?

 2.And I need to handle saving checkoinnt in shutdown hook also or
 driver will handle it automatically since it grcaefully stops stream 
 and
 handle
 completion of foreachRDD function on stream ?
 directKafkaStream.foreachRDD(new Function,
 Void>() {
 }

 Thanks


>>>
>>
>

>>>
>>
>


Re:Re: Possible issue for Spark SQL/DataFrame

2015-08-12 Thread Netwaver
It is a space separated data, just as below


 And What is your thought about the second issue?
Thank you.




At 2015-08-10 15:20:39, "Akhil Das"  wrote:

Isnt it a space separated data? It is not a comma(,) separated nor pipe (|) 
separated data.


Thanks
Best Regards


On Mon, Aug 10, 2015 at 12:06 PM, Netwaver  wrote:

Hi Spark experts,
 I am now using Spark 1.4.1 and trying Spark SQL/DataFrame API 
with text file in below format
id gender height
1  M  180
2  F   167
... ...
 But I meet issues as described below:
 1.  In my test program, I specify the schema programmatically, 
but when I use "|" as the separator in schema string, the code run into below 
exception when being executed on the cluster(Standalone)
  
   When I use "," as the separator, everything works fine.
  2.  In the code, when I use DataFrame.agg() function with 
same column name is used for different statistics functions(max,min,avg)
  valpeopleDF = sqlCtx.createDataFrame(rowRDD, schema)
  
peopleDF.filter(peopleDF("gender").equalTo("M")).agg(Map("height" -> 
"avg","height" -> "max","height" -> "min")).show() 
I just find only the last function's computation result is 
shown(as below), Does this work as design in Spark?
 
 Hopefully I have described the "issue" clearly, and please 
feel free to correct me if have done something wrong, thanks a lot.










Re: Spark DataFrames uses too many partition

2015-08-12 Thread Al M
The DataFrames parallelism currently controlled through configuration option
spark.sql.shuffle.partitions.  The default value is 200

I have raised an Improvement Jira to make it possible to specify the number
of partitions in https://issues.apache.org/jira/browse/SPARK-9872



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrames-uses-too-many-partition-tp24214p24223.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Parquet file organisation for 100GB+ dataframes

2015-08-12 Thread Ewan Leith
Hi all,

Can anyone share their experiences working with storing and organising larger 
datasets with Spark?

I've got a dataframe stored in Parquet on Amazon S3 (using EMRFS) which has a 
fairly complex nested schema (based on JSON files), which I can query in Spark, 
but the initial setup takes a few minutes, as we've got roughly 5000 partitions 
and 150GB of compressed parquet part files.

Generally things work, but we seem to be hitting various limitations now we're 
working with 100+GB of data, such as the 2GB block size limit in Spark which 
means we need a large number of partitions, slow startup due to partition 
discovery, etc.

Storing data in one big dataframe has worked well so far, but do we need to 
look at breaking it out into multiple dataframes?

Has anyone got any advice on how to structure this?

Thanks,
Ewan



What is the Effect of Serialization within Stages?

2015-08-12 Thread Mark Heimann
Hello everyone,

I am wondering what the effect of serialization is within a stage.

My understanding of Spark as an execution engine is that the data flow
graph is divided into stages and a new stage always starts after an
operation/transformation that cannot be pipelined (such as groupBy or join)
because it can only be completed after the whole data set has "been taken
care off". At the end of a stage shuffle files are written and at the
beginning of the next stage they are read from.

Within a stage my understanding is that pipelining is used, therefore I
wonder whether there is any serialization overhead involved when there is
no shuffling taking place. I am also assuming that my data set fits into
memory and must not be spilled to disk.

So if I would chain multiple *map* or *flatMap* operations and they end up
in the same stage, will there be any serialization overhead for piping the
result of the first *map* operation as a parameter into the following *map*
operation?

Any ideas and feedback appreciated, thanks a lot.

Best regards,
Mark


Is there any tool that i can prove to customer that spark is faster then hive ?

2015-08-12 Thread Ladle
Hi ,

I have build the the machine learning features and model using Apache spark.

And the same features i have i build using hive,java and used mahout to run
model.

Now how can i show to customer that Apache Spark is more faster then hive.

Is there any tool that shows the time ?

Regards,
Ladle



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-tool-that-i-can-prove-to-customer-that-spark-is-faster-then-hive-tp24224.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Possible issue for Spark SQL/DataFrame

2015-08-12 Thread Eugene Morozov
Cannot tell anything specific about separator as it’s not clear how you create 
schema from schemaString.

Regarding the second issue - that’s expected, because there is a Map there and 
you cannot provide more, than one value for the key. That’s why you see only 
the last “min” value.

This is a javadoc for the agg() function, you can try it this way.
/**
 * Aggregates on the entire [[DataFrame]] without groups.
 * {{
 *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
 *   df.agg(max($"age"), avg($"salary"))
 *   df.groupBy().agg(max($"age"), avg($"salary"))
 * }}
 * @group dfops
 */


On 10 Aug 2015, at 09:36, Netwaver  wrote:

> Hi Spark experts,
>  I am now using Spark 1.4.1 and trying Spark SQL/DataFrame 
> API with text file in below format
> id gender height
> 1  M  180
> 2  F   167
> ... ...
>  But I meet issues as described below:
>  1.  In my test program, I specify the schema 
> programmatically, but when I use "|" as the separator in schema string, the 
> code run into below exception when being executed on the cluster(Standalone)
>
> When I use "," as the separator, everything works fine.
>   2.  In the code, when I use DataFrame.agg() function with 
> same column name is used for different statistics functions(max,min,avg)
>   val peopleDF = sqlCtx.createDataFrame(rowRDD, schema)
>   
> peopleDF.filter(peopleDF("gender").equalTo("M")).agg(Map("height" -> 
> "avg","height" -> "max","height" -> "min")).show()  
> I just find only the last function's computation result 
> is shown(as below), Does this work as design in Spark?
>
>  Hopefully I have described the "issue" clearly, and please 
> feel free to correct me if have done something wrong, thanks a lot.
> 
> 

Eugene Morozov
fathers...@list.ru






Re: Is there any tool that i can prove to customer that spark is faster then hive ?

2015-08-12 Thread Nick Pentreath
Perhaps you could time the end-to-end runtime for each pipeline, and each stage?




Through Id be fairly confidant that Spark will outperform hive/mahout on MR, 
that's not he only consideration - having everything on a single platform and 
the Spark / data frame API is a huge win just by itself









—
Sent from Mailbox

On Wed, Aug 12, 2015 at 1:45 PM, Ladle  wrote:

> Hi ,
> I have build the the machine learning features and model using Apache spark.
> And the same features i have i build using hive,java and used mahout to run
> model.
> Now how can i show to customer that Apache Spark is more faster then hive.
> Is there any tool that shows the time ?
> Regards,
> Ladle
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-tool-that-i-can-prove-to-customer-that-spark-is-faster-then-hive-tp24224.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

Spark 1.4.1 py4j.Py4JException: Method read([]) does not exist

2015-08-12 Thread resonance
Hello everybody,

I am programming with Pyspark in the Eclipse IDE and have been trying to
transition to Spark 1.4.1 so that I may finally program using Python 3.
(Good work with the progress!) The following program works in Spark 1.3.1
but throws an exception in Spark 1.4.1:

"from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SQLContext

if __name__ == '__main__':
conf = SparkConf().setAppName("MyApp").setMaster("local")

global sc
sc = SparkContext(conf=conf)

global sqlc
sqlc = SQLContext(sc)

symbolsPath = 'SP500Industry.json'
symbolsRDD = sqlc.read.json(symbolsPath) 

print "Done""

The traceback I'm getting is as follows:

"Traceback (most recent call last):
  File "/media/gavin/20A6-76BF/Current Projects Luna/PySpark Test/Test.py",
line 21, in 
symbolsRDD = sqlc.read.json(symbolsPath) #rdd with all symbols (and
their industries
  File
"/home/gavin/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/context.py", line
582, in read
return DataFrameReader(self)
  File
"/home/gavin/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/readwriter.py",
line 39, in __init__
self._jreader = sqlContext._ssql_ctx.read()
  File
"/home/gavin/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File
"/home/gavin/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o18.read. Trace:
py4j.Py4JException: Method read([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)"

The external libraries I have for the project are
... spark-1.4.1-bin-hadoop2.6/python
... spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip
... spark-1.4.1-bin-hadoop2.6/python/lib/pyspark.zip (tried both including
and not including this)

Can anybody help me out with what I'm doing wrong?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-1-py4j-Py4JException-Method-read-does-not-exist-tp24227.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there any tool that i can prove to customer that spark is faster then hive ?

2015-08-12 Thread Gourav Sengupta
You might also need to consider the maturity of SPARKSQL vs HIVEQL.

Besides that please read the following (which will soon be available as a
part of standard Amazon stack, in case its not already)
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started.


All that you need to do is run the following (in case the environment is
already set up in AWS) and HIVE will be using SPARK for executing the
queries
hive> set hive.execution.engine=spark;


HIVE can use Map Reduce, Tez and now SPARK in order to execute its queries.

Please do read the details in the above link.


Regards,
Gourav Sengupta

On Wed, Aug 12, 2015 at 1:01 PM, Nick Pentreath 
wrote:

> Perhaps you could time the end-to-end runtime for each pipeline, and each
> stage?
>
> Through Id be fairly confidant that Spark will outperform hive/mahout on
> MR, that's not he only consideration - having everything on a single
> platform and the Spark / data frame API is a huge win just by itself
>
>
>
> —
> Sent from Mailbox 
>
>
> On Wed, Aug 12, 2015 at 1:45 PM, Ladle  wrote:
>
>> Hi ,
>>
>> I have build the the machine learning features and model using Apache
>> spark.
>>
>> And the same features i have i build using hive,java and used mahout to
>> run
>> model.
>>
>> Now how can i show to customer that Apache Spark is more faster then
>> hive.
>>
>> Is there any tool that shows the time ?
>>
>> Regards,
>> Ladle
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-tool-that-i-can-prove-to-customer-that-spark-is-faster-then-hive-tp24224.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Does Spark optimization might miss to run transformation?

2015-08-12 Thread Eugene Morozov
Hi!

I’d like to complete action (store / print smth) inside of transformation (map 
or mapPartitions). This approach has some flaws, but there is a question. Might 
it happen that Spark will optimise (RDD or DataFrame) processing so that my 
mapPartitions simply won’t happen?

--
Eugene Morozov
fathers...@list.ru






Re: make-distribution.sh failing at spark/R/lib/sparkr.zip

2015-08-12 Thread Ted Yu
I ran your command on Linux which passed.

Are you going to use SparkR ?
If so, consider including the following:

-Psparkr

Cheers

On Wed, Aug 12, 2015 at 3:31 AM, MEETHU MATHEW 
wrote:

> Hi,
>  I am trying to create a package using the make-distribution.sh script
> from the github master branch. But its not getting successfully completed.
> The last statement printed is
>
> *+ cp /home/meethu/git/FlytxtRnD/spark/R/lib/sparkr.zip
> /home/meethu/git/FlytxtRnD/spark/dist/R/lib*
> *cp: cannot stat `/home/meethu/git/FlytxtRnD/spark/R/lib/sparkr.zip': No
> such file or directory*
>
> My bulid is success and I am trying to execute the following command
>   *  ./make-distribution.sh --tgz -Pyarn -Dyarn.version=2.6.0
> -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive*
>
> Please help.
>
> Thanks & Regards,
>  Meethu M
>


Spark 1.2.2 build problem with Hive 0.12, bringing in wrong version of avro-mapred

2015-08-12 Thread java8964
Hi, This email is sent to both dev and user list, just want to see if someone 
familiar with Spark/Maven build procedure can provide any help.
I am building Spark 1.2.2 with the following command:
mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Phive -Phive-0.12.0
The spark-assembly-1.2.2-hadoop2.2.0.jar contains the avro and avro-ipc of 
version 1.7.6, but avro-mapred of version 1.7.1, which caused some wired 
runtime exception when I tried to read the avro file in the Spark 1.2.2, as 
following:
NullPointerExceptionat java.io.StringReader.(StringReader.java:50)
at org.apache.avro.Schema$Parser.parse(Schema.java:943) at 
org.apache.avro.Schema.parse(Schema.java:992)at 
org.apache.avro.mapred.AvroJob.getInputSchema(AvroJob.java:65)   at 
org.apache.avro.mapred.AvroRecordReader.(AvroRecordReader.java:43) at 
org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52) 
 at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:233)   at 
org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:210)
So I run the following command to understand that avro-mapred 1.7.1 is brought 
in by Hive 0.12 profile:
mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Phive -Phive-0.12.0 dependency:tree 
-Dverbose -Dincludes=org.apache.avro
[INFO] 
[INFO] 
Building Spark Project Hive 1.2.2[INFO] 
[INFO][INFO]
 --- maven-dependency-plugin:2.4:tree (default-cli) @ spark-hive_2.10 ---[INFO] 
org.apache.spark:spark-hive_2.10:jar:1.2.2[INFO] +- 
org.apache.spark:spark-core_2.10:jar:1.2.2:compile[INFO] |  \- 
org.apache.hadoop:hadoop-client:jar:2.2.0:compile (version managed from 
1.0.4)[INFO] | \- org.apache.hadoop:hadoop-common:jar:2.2.0:compile[INFO] | 
   \- (org.apache.avro:avro:jar:1.7.6:compile - version managed from 1.7.1; 
omitted for duplicate)[INFO] +- 
org.spark-project.hive:hive-serde:jar:0.12.0-protobuf-2.5:compile[INFO] |  +- 
(org.apache.avro:avro:jar:1.7.6:compile - version managed from 1.7.1; omitted 
for duplicate)[INFO] |  \- org.apache.avro:avro-mapred:jar:1.7.1:compile[INFO] 
| \- (org.apache.avro:avro-ipc:jar:1.7.6:compile - version managed from 
1.7.1; omitted for duplicate)[INFO] +- 
org.apache.avro:avro:jar:1.7.6:compile[INFO] \- 
org.apache.avro:avro-mapred:jar:hadoop2:1.7.6:compile[INFO]+- 
org.apache.avro:avro-ipc:jar:1.7.6:compile[INFO]|  \- 
(org.apache.avro:avro:jar:1.7.6:compile - version managed from 1.7.1; omitted 
for duplicate)[INFO]\- 
org.apache.avro:avro-ipc:jar:tests:1.7.6:compile[INFO]   \- 
(org.apache.avro:avro:jar:1.7.6:compile - version managed from 1.7.1; omitted 
for duplicate)[INFO]
In this case, I could manually fix all the classes in the final jar, changing 
from avro-mapred 1.7.1 to 1.7.6, but I wonder if there is any other solution, 
as this way is very error-prone.
Also, just from the above message, I can see avro-mapred.jar.hadoop2:1.7.6 
dependency is there, but looks like it is being omitted. Not sure why Maven 
choosed the lower version, as I am not a Maven guru.
My question, under the above situation, do I have a easy way to build it with 
avro-mapred 1.7.6, instead of 1.7.1?
Thanks
Yong  

Re: avoid duplicate due to executor failure in spark stream

2015-08-12 Thread Cody Koeninger
Accumulators aren't going to work to communicate state changes between
executors.  You need external storage.

On Tue, Aug 11, 2015 at 11:28 AM, Shushant Arora 
wrote:

> What if processing is neither idempotent nor its in transaction ,say  I am
> posting events to some external server after processing.
>
> Is it possible to get accumulator of failed task in retry task? Is there
> any way to detect whether this task is retried task or original task ?
>
> I was trying to achieve something like incrementing a counter after each
> event processed and if task fails- retry task will just ignore already
> processed events by accessing counter of failed task. Is it directly
> possible to access accumulator per task basis without writing to hdfs or
> hbase.
>
>
>
>
> On Tue, Aug 11, 2015 at 3:15 AM, Cody Koeninger 
> wrote:
>
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>
>>
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations
>>
>> https://www.youtube.com/watch?v=fXnNEq1v3VA
>>
>>
>> On Mon, Aug 10, 2015 at 4:32 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> How can I avoid duplicate processing of kafka messages in spark stream
>>> 1.3 because of executor failure.
>>>
>>> 1.Can I some how access accumulators of failed task in retry  task to
>>> skip those many events which are already processed by failed task on this
>>> partition ?
>>>
>>> 2.Or I ll have to persist each msg processed and then check before
>>> processing each msg whether its already processed by failure task and
>>> delete this perisited information at each batch end?
>>>
>>
>>
>


Re: grouping by a partitioned key

2015-08-12 Thread Philip Weaver
Yes, I am partitoning using DataFrameWriter.partitionBy, which produces the
keyed directory structure that you referenced in that link.

On Tue, Aug 11, 2015 at 11:54 PM, Hemant Bhanawat 
wrote:

> As far as I know, Spark SQL cannot process data on a per-partition-basis.
> DataFrame.foreachPartition is the way.
>
> I haven't tried it, but, following looks like a not-so-sophisticated way
> of making spark sql partition aware.
>
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery
>
>
> On Wed, Aug 12, 2015 at 5:00 AM, Philip Weaver 
> wrote:
>
>> Thanks.
>>
>> In my particular case, I am calculating a distinct count on a key that is
>> unique to each partition, so I want to calculate the distinct count within
>> each partition, and then sum those. This approach will avoid moving the
>> sets of that key around between nodes, which would be very expensive.
>>
>> Currently, to accomplish this we are manually reading in the parquet
>> files (not through Spark SQL), using a bitset to calculate the unique count
>> within each partition, and accumulating that sum. Doing this through Spark
>> SQL would be nice, but the naive "SELECT distinct(count(...))" approach
>> takes 60 times as long :). The approach I mentioned above might be an
>> acceptable hybrid solution.
>>
>> - Philip
>>
>>
>> On Tue, Aug 11, 2015 at 3:27 PM, Eugene Morozov 
>> wrote:
>>
>>> Philip,
>>>
>>> If all data per key are inside just one partition, then Spark will
>>> figure that out. Can you guarantee that’s the case?
>>> What is it you try to achieve? There might be another way for it, when
>>> you might be 100% sure what’s happening.
>>>
>>> You can print debugString or explain (for DataFrame) to see what’s
>>> happening under the hood.
>>>
>>>
>>> On 12 Aug 2015, at 01:19, Philip Weaver  wrote:
>>>
>>> If I have an RDD that happens to already be partitioned by a key, how
>>> efficient can I expect a groupBy operation to be? I would expect that Spark
>>> shouldn't have to move data around between nodes, and simply will have a
>>> small amount of work just checking the partitions to discover that it
>>> doesn't need to move anything around.
>>>
>>> Now, what if we're talking about a parquet database created by using
>>> DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group
>>> by a key that I'm already partitioned by?
>>>
>>> - Philip
>>>
>>>
>>> Eugene Morozov
>>> fathers...@list.ru
>>>
>>>
>>>
>>>
>>>
>>
>


Error writing to cassandra table using spark application

2015-08-12 Thread Nupur Kumar (BLOOMBERG/ 731 LEX)
Hello,

I am doing this for the first time so feel free to let me know/forward this to 
where it needs to be if not here.

I have a spark application that does some computations and writes the results 
to Cassandra. I had this set up working (and Cass table populating) on dev 
machines but when I do the same in prod machines I get the attached error. (I 
have not included the entire dump since it was too long, I have included things 
that I thought would be relevant. Let me know if you need more)

It says "failed to write statements.." and gives no more information than that. 
As you can see it was able to connect to Cassandra successfully. 

I have checked the ports, memory availability, providing it with all the right 
jars, version compatibility between Spark/Cass/Spark-Cass connector and none of 
these solved the problem.

I am able to cqlsh into Cassandra and insert data into keyspacename.tablename 
and that works fine so I don't think the error lies on Cassandra's side.

I am running out of ideas as to why this might be failing so any help would be 
appreciated!

Thanks,
Nupur

Error Trace
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark runs into an Infinite loop even if the tasks are completed successfully

2015-08-12 Thread Imran Rashid
yikes.

Was this a one-time thing?  Or does it happen consistently?  can you turn
on debug logging for o.a.s.scheduler (dunno if it will help, but maybe ...)

On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das 
wrote:

> Hi
>
> My Spark job (running in local[*] with spark 1.4.1) reads data from a
> thrift server(Created an RDD, it will compute the partitions in
> getPartitions() call and in computes hasNext will return records from these
> partitions), count(), foreach() is working fine it returns the correct
> number of records. But whenever there is shuffleMap stage (like reduceByKey
> etc.) then all the tasks are executing properly but it enters in an
> infinite loop saying :
>
>
>1. 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1
>(map at FilterMain.scala:59) because some of its tasks had failed: 0, 3
>
>
> Here's the complete stack-trace http://pastebin.com/hyK7cG8S
>
> What could be the root cause of this problem? I looked up and bumped into
> this closed JIRA  (which
> is very very old)
>
>
>
>
> Thanks
> Best Regards
>


Re: Controlling number of executors on Mesos vs YARN

2015-08-12 Thread Jerry Lam
Great stuff Tim. This definitely will make Mesos users life easier

Sent from my iPad

On 2015-08-12, at 11:52, Haripriya Ayyalasomayajula  
wrote:

> Thanks Tim, Jerry.
> 
> On Wed, Aug 12, 2015 at 1:18 AM, Tim Chen  wrote:
> Yes the options are not that configurable yet but I think it's not hard to 
> change it.
> 
> I have a patch out actually specifically able to configure amount of cpus per 
> executor in coarse grain mode, and hopefully merged next release.
> 
> I think the open question now is for fine grain mode can we limit the number 
> of maximum concurrent executors, and I think we can definitely just add a new 
> option like spark.mesos.executor.max to cap it. 
> 
> I'll file a jira and hopefully to get this change in soon too.
> 
> Tim
> 
> 
> 
> On Tue, Aug 11, 2015 at 6:21 AM, Haripriya Ayyalasomayajula 
>  wrote:
> Spark evolved as an example framework for Mesos - thats how I know it. It is 
> surprising to see that the options provided by mesos in this case are less. 
> Tweaking the source code, haven't done it yet but I would love to see what 
> options could be there! 
> 
> On Tue, Aug 11, 2015 at 5:42 AM, Jerry Lam  wrote:
> My experience with Mesos + Spark is not great. I saw one executor with 30 CPU 
> and the other executor with 6. So I don't think you can easily configure it 
> without some tweaking at the source code.
> 
> Sent from my iPad
> 
> On 2015-08-11, at 2:38, Haripriya Ayyalasomayajula  
> wrote:
> 
>> Hi Tim,
>> 
>> Spark on Yarn allows us to do it using --num-executors and --executor_cores 
>> commandline arguments. I just got a chance to look at a similar spark user 
>> list mail, but no answer yet. So does mesos allow setting the number of 
>> executors and cores? Is there a default number it assumes?
>> 
>> On Mon, Jan 5, 2015 at 5:07 PM, Tim Chen  wrote:
>> Forgot to hit reply-all.
>> 
>> -- Forwarded message --
>> From: Tim Chen 
>> Date: Sun, Jan 4, 2015 at 10:46 PM
>> Subject: Re: Controlling number of executors on Mesos vs YARN
>> To: mvle 
>> 
>> 
>> Hi Mike,
>> 
>> You're correct there is no such setting in for Mesos coarse grain mode, 
>> since the assumption is that each node is launched with one container and 
>> Spark is launching multiple tasks in that container.
>> 
>> In fine-grain mode there isn't a setting like that, as it currently will 
>> launch an executor as long as it satisfies the minimum container resource 
>> requirement.
>> 
>> I've created a JIRA earlier about capping the number of executors or better 
>> distribute the # of executors launched in each node. Since the decision of 
>> choosing what node to launch containers is all in the Spark scheduler side, 
>> it's very easy to modify it.
>> 
>> Btw, what's the configuration to set the # of executors on YARN side?
>> 
>> Thanks,
>> 
>> Tim
>> 
>> 
>> 
>> On Sun, Jan 4, 2015 at 9:37 PM, mvle  wrote:
>> I'm trying to compare the performance of Spark running on Mesos vs YARN.
>> However, I am having problems being able to configure the Spark workload to
>> run in a similar way on Mesos and YARN.
>> 
>> When running Spark on YARN, you can specify the number of executors per
>> node. So if I have a node with 4 CPUs, I can specify 6 executors on that
>> node. When running Spark on Mesos, there doesn't seem to be an equivalent
>> way to specify this. In Mesos, you can somewhat force this by specifying the
>> number of CPU resources to be 6 when running the slave daemon. However, this
>> seems to be a static configuration of the Mesos cluster rather something
>> that can be configured in the Spark framework.
>> 
>> So here is my question:
>> 
>> For Spark on Mesos, am I correct that there is no way to control the number
>> of executors per node (assuming an idle cluster)? For Spark on Mesos
>> coarse-grained mode, there is a way to specify max_cores but that is still
>> not equivalent to specifying the number of executors per node as when Spark
>> is run on YARN.
>> 
>> If I am correct, then it seems Spark might be at a disadvantage running on
>> Mesos compared to YARN (since it lacks the fine tuning ability provided by
>> YARN).
>> 
>> Thanks,
>> Mike
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Controlling-number-of-executors-on-Mesos-vs-YARN-tp20966.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>> 
>> 
>> 
>> 
>> 
>> -- 
>> Regards,
>> Haripriya Ayyalasomayajula 
>> 
> 
> 
> 
> -- 
> Regards,
> Haripriya Ayyalasomayajula 
> 
> 
> 
> 
> 
> -- 
> Regards,
> Haripriya Ayyalasomayajula 
> 


Re: stopping spark stream app

2015-08-12 Thread Tathagata Das
Well, system.exit will not ensure all data was processed before shutdown.
There should not be a deadlock is onBatchCompleted just starts the thread
(that runs stop()) and completes.

On Wed, Aug 12, 2015 at 1:50 AM, Shushant Arora 
wrote:

> calling jssc.stop(false/true,false/true) from streamingListener causes
> deadlock , So I created another thread and called jssc.stop from  that but
> that too caused deadlock if onBatchCompleted is not completed before
> jssc.stop().
>
> So is it safe If I call System.exit(1) from another thread without calling
> jssc.stop()- since that leads to deadlock.
>
>
> On Tue, Aug 11, 2015 at 9:54 PM, Shushant Arora  > wrote:
>
>> Is stopping in the streaming context in onBatchCompleted event
>> of StreamingListener does not kill the app?
>>
>> I have below code in streaming listener
>>
>> public void onBatchCompleted(StreamingListenerBatchCompleted arg0) {
>> //check stop condition
>> System.out.println("stopping gracefully");
>> jssc.stop(false,false);
>> System.out.println("stopped gracefully");
>> }
>>
>> stopped gracefully is never printed.
>>
>> On UI no more batches are processed but application is never
>> killed/stopped? Whats the best way to kill the app.after stopping context?
>>
>>
>>
>> On Tue, Aug 11, 2015 at 2:55 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Thanks!
>>>
>>>
>>>
>>> On Tue, Aug 11, 2015 at 1:34 AM, Tathagata Das 
>>> wrote:
>>>
 1. RPC can be done in many ways, and a web service is one of many ways.
 A even more hacky version can be the app polling a file in a file system,
 if the file exists start shutting down.
 2. No need to set a flag. When you get the signal from RPC, you can
 just call context.stop(stopGracefully = true) . Though note that this is
 blocking, so gotta be carefully about doing blocking calls on the RPC
 thread.

 On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> By RPC you mean web service exposed on driver which listens and set
> some flag and driver checks that flag at end of each batch and if set then
> gracefully stop the application ?
>
> On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das 
> wrote:
>
>> In general, it is a little risky to put long running stuff in a
>> shutdown hook as it may delay shutdown of the process which may delay 
>> other
>> things. That said, you could try it out.
>>
>> A better way to explicitly shutdown gracefully is to use an RPC to
>> signal the driver process to start shutting down, and then the process 
>> will
>> gracefully stop the context and terminate. This is more robust that than
>> leveraging shutdown hooks.
>>
>> On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Any help in best recommendation for gracefully shutting down a spark
>>> stream application ?
>>> I am running it on yarn and a way to tell from externally either
>>> yarn application -kill command or some other way but need current batch 
>>> to
>>> be processed completely and checkpoint to be saved before shutting down.
>>>
>>> Runtime.getRuntime().addShutdownHook does not seem to be working.
>>> Yarn kills the application immediately and dooes not call shutdown hook
>>> call back .
>>>
>>>
>>> On Sun, Aug 9, 2015 at 12:45 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Hi

 How to ensure in spark streaming 1.3 with kafka that when an
 application is killed , last running batch is fully processed and 
 offsets
 are written to checkpointing dir.

 On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> Hi
>
> I am using spark stream 1.3 and using custom checkpoint to save
> kafka offsets.
>
> 1.Is doing
> Runtime.getRuntime().addShutdownHook(new Thread() {
>   @Override
>   public void run() {
>   jssc.stop(true, true);
>System.out.println("Inside Add Shutdown Hook");
>   }
>  });
>
> to handle stop is safe ?
>
> 2.And I need to handle saving checkoinnt in shutdown hook also or
> driver will handle it automatically since it grcaefully stops stream 
> and
> handle
> completion of foreachRDD function on stream ?
> directKafkaStream.foreachRDD(new Function,
> Void>() {
> }
>
> Thanks
>
>

>>>
>>
>

>>>
>>
>


Re: Partitioning in spark streaming

2015-08-12 Thread Tathagata Das
Yes.

On Wed, Aug 12, 2015 at 12:12 PM, Mohit Anchlia 
wrote:

> Thanks! To write to hdfs I do need to use saveAs method?
>
> On Wed, Aug 12, 2015 at 12:01 PM, Tathagata Das 
> wrote:
>
>> This is how Spark does. It writes the task output to a uniquely-named
>> temporary file, and then atomically (after the task successfully completes)
>> renames the temp file to the expected file name /
>>
>>
>> On Tue, Aug 11, 2015 at 9:53 PM, Mohit Anchlia 
>> wrote:
>>
>>> Thanks for the info. When data is written in hdfs how does spark keeps
>>> the filenames written by multiple executors unique
>>>
>>> On Tue, Aug 11, 2015 at 9:35 PM, Hemant Bhanawat 
>>> wrote:
>>>
 Posting a comment from my previous mail post:

 When data is received from a stream source, receiver creates blocks of
 data.  A new block of data is generated every blockInterval milliseconds. N
 blocks of data are created during the batchInterval where N =
 batchInterval/blockInterval. A RDD is created on the driver for the blocks
 created during the batchInterval. The blocks generated during the
 batchInterval are partitions of the RDD.

 Now if you want to repartition based on a key, a shuffle is needed.

 On Wed, Aug 12, 2015 at 4:36 AM, Mohit Anchlia 
 wrote:

> How does partitioning in spark work when it comes to streaming? What's
> the best way to partition a time series data grouped by a certain tag like
> categories of product video, music etc.
>


>>>
>>
>


Re: stopping spark stream app

2015-08-12 Thread Tathagata Das
stop() is a blocking method when stopGraceful is set to true. In that case,
it obviously waits for all batches with data to complete processing.
Why are you joining on the thread in streaming listener? The listener is
just a callback listener and is NOT supposed to do any long running
blocking stuff.
If you intention is that you will call stop() just in
listener.onBatchCompleted to prevent the next batch from starting, that is
WRONG. The listener is issued callbacks asynchronous to the processing loop
of the context.
As I said earlier, the ssc.stop() does not need to be (and in fact, most
cases, should not be) called from the listener. It should be called from
some other thread. If you have to make sure that the main program waits for
stop to complete (especially in the case of graceful stop), then make the
main program thread wait for stopping-thread.join(). Under no circumstances
should you do blocking calls in the listener events.

On Wed, Aug 12, 2015 at 12:13 PM, Shushant Arora 
wrote:

> does streamingcontext.stop() is a blocking method? I mean does it wait for
> all the batches completion and complete of all streaminglisteners . Since
> it may happen in new thread by the time sc.stop() is called a new batch is
> already started beacause of race condition.So it will wait for new batch
> completion also.
>
> I was actually joining the streaming listener to new thread which caused
> the deadlock - since sc.stop() is blocking and it wait for all streaming
> listeners to complete also - right?
>
> On Thu, Aug 13, 2015 at 12:33 AM, Tathagata Das 
> wrote:
>
>> Well, system.exit will not ensure all data was processed before shutdown.
>> There should not be a deadlock is onBatchCompleted just starts the thread
>> (that runs stop()) and completes.
>>
>> On Wed, Aug 12, 2015 at 1:50 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> calling jssc.stop(false/true,false/true) from streamingListener causes
>>> deadlock , So I created another thread and called jssc.stop from  that but
>>> that too caused deadlock if onBatchCompleted is not completed before
>>> jssc.stop().
>>>
>>> So is it safe If I call System.exit(1) from another thread without
>>> calling jssc.stop()- since that leads to deadlock.
>>>
>>>
>>> On Tue, Aug 11, 2015 at 9:54 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Is stopping in the streaming context in onBatchCompleted event
 of StreamingListener does not kill the app?

 I have below code in streaming listener

 public void onBatchCompleted(StreamingListenerBatchCompleted arg0) {
 //check stop condition
 System.out.println("stopping gracefully");
 jssc.stop(false,false);
 System.out.println("stopped gracefully");
 }

 stopped gracefully is never printed.

 On UI no more batches are processed but application is never
 killed/stopped? Whats the best way to kill the app.after stopping context?



 On Tue, Aug 11, 2015 at 2:55 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> Thanks!
>
>
>
> On Tue, Aug 11, 2015 at 1:34 AM, Tathagata Das 
> wrote:
>
>> 1. RPC can be done in many ways, and a web service is one of many
>> ways. A even more hacky version can be the app polling a file in a file
>> system, if the file exists start shutting down.
>> 2. No need to set a flag. When you get the signal from RPC, you can
>> just call context.stop(stopGracefully = true) . Though note that this is
>> blocking, so gotta be carefully about doing blocking calls on the RPC
>> thread.
>>
>> On Mon, Aug 10, 2015 at 12:24 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> By RPC you mean web service exposed on driver which listens and set
>>> some flag and driver checks that flag at end of each batch and if set 
>>> then
>>> gracefully stop the application ?
>>>
>>> On Tue, Aug 11, 2015 at 12:43 AM, Tathagata Das >> > wrote:
>>>
 In general, it is a little risky to put long running stuff in a
 shutdown hook as it may delay shutdown of the process which may delay 
 other
 things. That said, you could try it out.

 A better way to explicitly shutdown gracefully is to use an RPC to
 signal the driver process to start shutting down, and then the process 
 will
 gracefully stop the context and terminate. This is more robust that 
 than
 leveraging shutdown hooks.

 On Mon, Aug 10, 2015 at 11:56 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> Any help in best recommendation for gracefully shutting down a
> spark stream application ?
> I am running it on yarn and a way to tell from externally either
> yarn application -kill command or some other way but need current 
> batch to
> be processed completely 

Spark - Standalone Vs YARN Vs Mesos

2015-08-12 Thread ๏̯͡๏
Do we have any comparisons in terms of resource utilization, scheduling of
running Spark in the below three modes
1) Standalone
2) over YARN
3) over Mesos

Can some one share resources (thoughts/URLs) on this area.


-- 
Deepak


Re: Controlling number of executors on Mesos vs YARN

2015-08-12 Thread Tim Chen
You're referring to both fine grain and coarse grain?

Desirable number of executors per node could be interesting but it can't be
guaranteed (or we could try to and when failed abort the job).

How would you imagine this new option to actually work?


Tim

On Wed, Aug 12, 2015 at 11:48 AM, Ajay Singal  wrote:

> Hi Tim,
>
> An option like spark.mesos.executor.max to cap the number of executors
> per node/application would be very useful.  However, having an option like 
> spark.mesos.executor.num
> to specify desirable number of executors per node would provide even/much
> better control.
>
> Thanks,
> Ajay
>
> On Wed, Aug 12, 2015 at 4:18 AM, Tim Chen  wrote:
>
>> Yes the options are not that configurable yet but I think it's not hard
>> to change it.
>>
>> I have a patch out actually specifically able to configure amount of cpus
>> per executor in coarse grain mode, and hopefully merged next release.
>>
>> I think the open question now is for fine grain mode can we limit the
>> number of maximum concurrent executors, and I think we can definitely just
>> add a new option like spark.mesos.executor.max to cap it.
>>
>> I'll file a jira and hopefully to get this change in soon too.
>>
>> Tim
>>
>>
>>
>> On Tue, Aug 11, 2015 at 6:21 AM, Haripriya Ayyalasomayajula <
>> aharipriy...@gmail.com> wrote:
>>
>>> Spark evolved as an example framework for Mesos - thats how I know it.
>>> It is surprising to see that the options provided by mesos in this case are
>>> less. Tweaking the source code, haven't done it yet but I would love to see
>>> what options could be there!
>>>
>>> On Tue, Aug 11, 2015 at 5:42 AM, Jerry Lam  wrote:
>>>
 My experience with Mesos + Spark is not great. I saw one executor with
 30 CPU and the other executor with 6. So I don't think you can easily
 configure it without some tweaking at the source code.

 Sent from my iPad

 On 2015-08-11, at 2:38, Haripriya Ayyalasomayajula <
 aharipriy...@gmail.com> wrote:

 Hi Tim,

 Spark on Yarn allows us to do it using --num-executors and
 --executor_cores commandline arguments. I just got a chance to look at a
 similar spark user list mail, but no answer yet. So does mesos allow
 setting the number of executors and cores? Is there a default number it
 assumes?

 On Mon, Jan 5, 2015 at 5:07 PM, Tim Chen  wrote:

> Forgot to hit reply-all.
>
> -- Forwarded message --
> From: Tim Chen 
> Date: Sun, Jan 4, 2015 at 10:46 PM
> Subject: Re: Controlling number of executors on Mesos vs YARN
> To: mvle 
>
>
> Hi Mike,
>
> You're correct there is no such setting in for Mesos coarse grain
> mode, since the assumption is that each node is launched with one 
> container
> and Spark is launching multiple tasks in that container.
>
> In fine-grain mode there isn't a setting like that, as it currently
> will launch an executor as long as it satisfies the minimum container
> resource requirement.
>
> I've created a JIRA earlier about capping the number of executors or
> better distribute the # of executors launched in each node. Since the
> decision of choosing what node to launch containers is all in the Spark
> scheduler side, it's very easy to modify it.
>
> Btw, what's the configuration to set the # of executors on YARN side?
>
> Thanks,
>
> Tim
>
>
>
> On Sun, Jan 4, 2015 at 9:37 PM, mvle  wrote:
>
>> I'm trying to compare the performance of Spark running on Mesos vs
>> YARN.
>> However, I am having problems being able to configure the Spark
>> workload to
>> run in a similar way on Mesos and YARN.
>>
>> When running Spark on YARN, you can specify the number of executors
>> per
>> node. So if I have a node with 4 CPUs, I can specify 6 executors on
>> that
>> node. When running Spark on Mesos, there doesn't seem to be an
>> equivalent
>> way to specify this. In Mesos, you can somewhat force this by
>> specifying the
>> number of CPU resources to be 6 when running the slave daemon.
>> However, this
>> seems to be a static configuration of the Mesos cluster rather
>> something
>> that can be configured in the Spark framework.
>>
>> So here is my question:
>>
>> For Spark on Mesos, am I correct that there is no way to control the
>> number
>> of executors per node (assuming an idle cluster)? For Spark on Mesos
>> coarse-grained mode, there is a way to specify max_cores but that is
>> still
>> not equivalent to specifying the number of executors per node as when
>> Spark
>> is run on YARN.
>>
>> If I am correct, then it seems Spark might be at a disadvantage
>> running on
>> Mesos compared to YARN (since it lacks the fine tuning ability
>> provided by
>> YARN).
>>
>> Thanks,
>> Mike
>>>

spark's behavior about failed tasks

2015-08-12 Thread freedafeng
Hello there,

I have a spark running in a 20 node cluster. The job is logically simple,
just a mapPartition and then sum. The return value of the mapPartitions is
an integer for each partition. The tasks got some random failure (which
could be caused by a 3rh party key-value store connections. The cause is
irrelevant to my question). In more details,

Description:
1. spark 1.1.1. 
2. 4096 tasks total.
3. 66 failed tasks.

Issue:
Spark seems rerunning all the 4096 tasks instead of the 66 failed tasks. It
current runs at 469/4096 (stage2). 

Is this behavior normal? 

Thanks for your help!






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-s-behavior-about-failed-tasks-tp24232.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.3 + Parquet: "Skipping data using statistics"

2015-08-12 Thread YaoPau
I've seen this function referenced in a couple places, first  this forum post
  
and  this talk by Michael Armbrust
   during the 42nd minute.

As I understand it, if you create a Parquet file using Spark, Spark will
then have access to min/max vals for each column.  If a query asks for a
value outside that range (like a timestamp), Spark will know to skip that
file entirely.

Michael says this feature is turned off by default in 1.3.  How can I turn
this on?

I don't see much about this feature online.  A couple other questions:

- Does this only work for Parquet files that were created in Spark?  For
example, if I create the Parquet file using Hive + MapReduce, or Impala,
would Spark still have access to min/max values?

- Does this feature work at the row chunk level, or just at the file level?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-Parquet-Skipping-data-using-statistics-tp24233.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



what is cause of, and how to recover from, unresponsive nodes w/ spark-ec2 script

2015-08-12 Thread AlexG
I'm using the spark-ec2 script to launch a 30 node r3.8xlarge cluster.
Occasionally several nodes will become unresponsive: I will notice that hdfs
complains it can't find some blocks, then when I go to restart hadoop, the
messages indicate that the connection to some nodes timed out, then when I
check, I can't ssh into those nodes at all.

Is this a problem others have experienced? What is causing this random
failure--- or where can I look to find relevant logs---, and how can I
recover from this other than to destroy the cluster and start anew
(time-consuming, tedious, and requiring that I pull down my large dataset
from S3 to HDFS once again, but this is what I've been doing currently)?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-cause-of-and-how-to-recover-from-unresponsive-nodes-w-spark-ec2-script-tp24235.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unit Testing

2015-08-12 Thread Mohit Anchlia
Is there a way to run spark streaming methods in standalone eclipse
environment to test out the functionality?


Re: Sorted Multiple Outputs

2015-08-12 Thread Eugene Morozov
Yiannis, 

sorry for late response, 
It is indeed not possible to create new RDD inside of foreachPartitions, so you 
have to write data manually. I haven’t tried that and haven’t got such an 
exception, but I’d assume you might try to write locally and them upload it 
into HDFS. FileSystem has a specific method for that “copyFromLocalFile”.

Another approach would be to try to split RDD into multiple RDDs by key. You 
can get distinct keys, collect them on driver and have a loop over they keys 
and filter out new RDD out of the original one by that key.

for( key : keys ) {
RDD.filter( key ).saveAsTextfile()
}

It might help to cache original rdd.

On 16 Jul 2015, at 12:21, Yiannis Gkoufas  wrote:

> Hi Eugene,
> 
> thanks for your response!
> Your recommendation makes sense, that's what I more or less tried.
> The problem that I am facing is that inside foreachPartition() I cannot 
> create a new rdd and use saveAsTextFile.
> It would probably make sense to write directly to HDFS using the Java API.
> When I tried that I was getting errors similar to this:
> 
> Failed on local exception: java.io.InterruptedIOException: Interruped while 
> waiting for IO on channel java.nio.channels.SocketChannel
> 
> Probably it's hitting a race condition.
> 
> Has anyone else faced this situation? Any suggestions?
> 
> Thanks a lot! 
> 
> On 15 July 2015 at 14:04, Eugene Morozov  wrote:
> Yiannis ,
> 
> It looks like you might explore other approach.
> 
> sc.textFile("input/path")
> .map() // your own implementation
> .partitionBy(new HashPartitioner(num))
> .groupBy() //your own implementation, as a result - PairRDD of key vs 
> Iterable of values
> .foreachPartition()
> 
> On the last step you could sort all values for the key and store them into 
> separate file even into the same directory of all other files for other keys. 
> HashParititoner must guarantee that all values for specific key will reside 
> in just one partition, but it might happen that one partition might contain 
> more, than one key (with values). This I’m not sure, but that shouldn’t be a 
> big deal as you would iterate over tuple> and store one 
> key to a specific file.
> 
> On 15 Jul 2015, at 03:23, Yiannis Gkoufas  wrote:
> 
>> Hi there,
>> 
>> I have been using the approach described here:
>> 
>> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
>> 
>> In addition to that, I was wondering if there is a way to set the customize 
>> the order of those values contained in each file.
>> 
>> Thanks a lot!
> 
> Eugene Morozov
> fathers...@list.ru
> 
> 
> 
> 
> 

Eugene Morozov
fathers...@list.ru






Re: PySpark order-only window function issue

2015-08-12 Thread Davies Liu
This should be a bug, go ahead to open a JIRA for it, thanks!

On Tue, Aug 11, 2015 at 6:41 AM, Maciej Szymkiewicz
 wrote:
> Hello everyone,
>
> I am trying to use PySpark API with window functions without specifying
> partition clause. I mean something equivalent to this
>
> SELECT v, row_number() OVER (ORDER BY v) AS rn FROM df
>
> in SQL. I am not sure if I am doing something wrong or it is a bug but
> results are far from what I expect. Lets assume we have data as follows:
>
> from pyspark.sql.window import Window
> from pyspark.sql import functions as f
>
> df = sqlContext.createDataFrame(
> zip(["foo"] * 5 + ["bar"] * 5, range(1, 6) + range(6, 11)),
> ("k", "v")
> ).withColumn("dummy", f.lit(1))
>
> df.registerTempTable("df")
> df.show()
>
> +---+--+-+
> |  k| v|dummy|
> +---+--+-+
> |foo| 1|1|
> |foo| 2|1|
> |foo| 3|1|
> |foo| 4|1|
> |foo| 5|1|
> |bar| 6|1|
> |bar| 7|1|
> |bar| 8|1|
> |bar| 9|1|
> |bar|10|1|
> +---+--+-+
>
> When I use following SQL query
>
> sql_ord = """SELECT k, v, row_number() OVER (
> ORDER BY v
> ) AS rn FROM df"""
>
> sqlContext.sql(sql_ord).show()
>
> I get expected results:
>
> +---+--+--+
> |  k| v|rn|
> +---+--+--+
> |foo| 1| 1|
> |foo| 2| 2|
> |foo| 3| 3|
> |foo| 4| 4|
> |foo| 5| 5|
> |bar| 6| 6|
> |bar| 7| 7|
> |bar| 8| 8|
> |bar| 9| 9|
> |bar|10|10|
> +---+--+--+
>
> but when I try to define a similar thing using Python API
>
> w_ord = Window.orderBy("v")
> df.select("k", "v", f.rowNumber().over(w_ord).alias("avg")).show()
>
> I get results like this:
>
> +---+--+---+
> |  k| v|avg|
> +---+--+---+
> |foo| 1|  1|
> |foo| 2|  1|
> |foo| 3|  1|
> |foo| 4|  1|
> |foo| 5|  1|
> |bar| 6|  1|
> |bar| 7|  1|
> |bar| 8|  1|
> |bar| 9|  1|
> |bar|10|  1|
> +---+--+---+
>
> When I specify both partition on order
>
> w_part_ord = Window.partitionBy("dummy").orderBy("v")
> df.select("k", "v", f.rowNumber().over(w_part_ord).alias("avg")).show()
>
> everything works as I expect:
>
> +---+--+---+
> |  k| v|avg|
> +---+--+---+
> |foo| 1|  1|
> |foo| 2|  2|
> |foo| 3|  3|
> |foo| 4|  4|
> |foo| 5|  5|
> |bar| 6|  6|
> |bar| 7|  7|
> |bar| 8|  8|
> |bar| 9|  9|
> |bar|10| 10|
> +---+--+---+
>
> Another example of similar behavior with correct SQL result:
>
> sql_ord_rng = """SELECT k, v, avg(v) OVER (
> ORDER BY v
> ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
> ) AS avg FROM df"""
> sqlContext.sql(sql_ord_rng).show()
>
> +---+--+---+
> |  k| v|avg|
> +---+--+---+
> |foo| 1|1.5|
> |foo| 2|2.0|
> |foo| 3|3.0|
> |foo| 4|4.0|
> |foo| 5|5.0|
> |bar| 6|6.0|
> |bar| 7|7.0|
> |bar| 8|8.0|
> |bar| 9|9.0|
> |bar|10|9.5|
> +---+--+---+
>
> and not incorrect PySpark
>
> w_ord_rng = Window.orderBy("v").rowsBetween(-1, 1)
> df.select("k", "v", f.avg("v").over(w_ord_rng).alias("avg")).show()
>
> +---+--++
> |  k| v| avg|
> +---+--++
> |foo| 1| 1.0|
> |foo| 2| 2.0|
> |foo| 3| 3.0|
> |foo| 4| 4.0|
> |foo| 5| 5.0|
> |bar| 6| 6.0|
> |bar| 7| 7.0|
> |bar| 8| 8.0|
> |bar| 9| 9.0|
> |bar|10|10.0|
> +---+--++
>
> Same as before adding dummy partitions solves the problem:
>
> w_part_ord_rng =
> Window.partitionBy("dummy").orderBy("v").rowsBetween(-1, 1)
> df.select("k", "v", f.avg("v").over(w_part_ord_rng).alias("avg")).show()
>
> +---+--+---+
> |  k| v|avg|
> +---+--+---+
> |foo| 1|1.5|
> |foo| 2|2.0|
> |foo| 3|3.0|
> |foo| 4|4.0|
> |foo| 5|5.0|
> |bar| 6|6.0|
> |bar| 7|7.0|
> |bar| 8|8.0|
> |bar| 9|9.0|
> |bar|10|9.5|
> +---+--+---+
>
> I've checked window functions tests
> (https://github.com/apache/spark/blob/ac507a03c3371cd5404ca195ee0ba0306badfc23/python/pyspark/sql/tests.py#L1105)
> but these cover only partition + order case.
>
> Is there something wrong with my window definitions or should I open
> Jira issue?
>
> Environment:
>
> - Debian GNU/Linux
> -  Spark 1.4.1
> - Python 2.7.9
> -  OpenJDK Runtime Environment (IcedTea 2.5.5) (7u79-2.5.5-1~deb8u1)
>
> --
> Best,
> Maciej
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark - Standalone Vs YARN Vs Mesos

2015-08-12 Thread Tim Chen
I'm not sure what you're looking for, since you can't really compare
Standalone with YARN or Mesos, as Standalone is assuming the Spark
workers/master owns the cluster, and YARN/Mesos is trying to share the
cluster among different applications/frameworks.

And when you refer to resource utilization, what exactly does it mean to
you? Is it the ability to maximize the usage of your resources with
multiple applications in mind, or just how much configuration Spark allows
you to in each mode?

Tim

On Wed, Aug 12, 2015 at 2:16 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> Do we have any comparisons in terms of resource utilization, scheduling of
> running Spark in the below three modes
> 1) Standalone
> 2) over YARN
> 3) over Mesos
>
> Can some one share resources (thoughts/URLs) on this area.
>
>
> --
> Deepak
>
>


Re: grouping by a partitioned key

2015-08-12 Thread Hemant Bhanawat
Inline..

On Thu, Aug 13, 2015 at 5:06 AM, Eugene Morozov  wrote:

> Hemant, William, pls see inlined.
>
> On 12 Aug 2015, at 18:18, Philip Weaver  wrote:
>
> Yes, I am partitoning using DataFrameWriter.partitionBy, which produces
> the keyed directory structure that you referenced in that link.
>
>
> Have you tried to use DataFrame API instead of SQL? I mean smth like
> dataFrame.select(key).agg(count).distinct().agg(sum).
> Could you print explain for this way and for SQL you tried? I’m just
> curious of the difference.
>
>
> On Tue, Aug 11, 2015 at 11:54 PM, Hemant Bhanawat 
> wrote:
>
>> As far as I know, Spark SQL cannot process data on a per-partition-basis.
>> DataFrame.foreachPartition is the way.
>>
>
> What do you mean by “cannot process on per-partition-basis”? DataFrame is
> an RDD on steroids.
>

I meant that Spark SQL cannot process data of a single partition like you
can do with foreachpartition.

>
>
>> I haven't tried it, but, following looks like a not-so-sophisticated way
>> of making spark sql partition aware.
>>
>>
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery
>>
>>
>> On Wed, Aug 12, 2015 at 5:00 AM, Philip Weaver 
>> wrote:
>>
>>> Thanks.
>>>
>>> In my particular case, I am calculating a distinct count on a key that
>>> is unique to each partition, so I want to calculate the distinct count
>>> within each partition, and then sum those. This approach will avoid moving
>>> the sets of that key around between nodes, which would be very expensive.
>>>
>>> Currently, to accomplish this we are manually reading in the parquet
>>> files (not through Spark SQL), using a bitset to calculate the unique count
>>> within each partition, and accumulating that sum. Doing this through Spark
>>> SQL would be nice, but the naive "SELECT distinct(count(...))" approach
>>> takes 60 times as long :). The approach I mentioned above might be an
>>> acceptable hybrid solution.
>>>
>>> - Philip
>>>
>>>
>>> On Tue, Aug 11, 2015 at 3:27 PM, Eugene Morozov 
>>> wrote:
>>>
 Philip,

 If all data per key are inside just one partition, then Spark will
 figure that out. Can you guarantee that’s the case?
 What is it you try to achieve? There might be another way for it, when
 you might be 100% sure what’s happening.

 You can print debugString or explain (for DataFrame) to see what’s
 happening under the hood.


 On 12 Aug 2015, at 01:19, Philip Weaver 
 wrote:

 If I have an RDD that happens to already be partitioned by a key, how
 efficient can I expect a groupBy operation to be? I would expect that Spark
 shouldn't have to move data around between nodes, and simply will have a
 small amount of work just checking the partitions to discover that it
 doesn't need to move anything around.

 Now, what if we're talking about a parquet database created by using
 DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group
 by a key that I'm already partitioned by?

 - Philip


 Eugene Morozov
 fathers...@list.ru





>>>
>>
>
> Eugene Morozov
> fathers...@list.ru
>
>
>
>
>


Re: collect() works, take() returns "ImportError: No module named iter"

2015-08-12 Thread YaoPau
In case anyone runs into this issue in the future, we got it working: the
following variable must be set on the edge node:

export
PYSPARK_PYTHON=/your/path/to/whatever/python/you/want/to/run/bin/python

I didn't realize that variable gets passed to every worker node.  All I saw
when searching for this issue was documentation for an older version of
Spark which mentions using SPARK_YARN_USER_ENV to set PYSPARK_PYTHON within
spark-env.sh, which didn't work for us on Spark 1.3.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/collect-works-take-returns-ImportError-No-module-named-iter-tp24199p24234.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org