Re: Very slow complex type column reads from parquet

2018-06-18 Thread Ryan Blue
Jakub,

I'm moving the Spark list to bcc and adding the Parquet list, since you're
probably more interested in Parquet tuning.

It makes sense that you're getting better performance when you have more
matching rows distributed, especially if those rows have a huge column that
you need to project. You're just able to use more processing power at once.
Finding a good layout or sort to distribute the data is a good start, and
you're right that decreasing the row group size would help: more (matching)
row groups, more parallelism. Just be careful because decreasing the row
group size can inflate the overall size of the data.

Parquet will also eliminate row groups in each task when possible. At the
start of the task, it uses the row group's column stats (min, max, num
nulls), and before the row group is read it will also attempt to read any
column dictionaries and use those to see if any values match.

If a row group can't be eliminated, all of the rows in that group will be
materialized. One thing that's probably really hurting you here is that
each row from Parquet is copied in memory twice for this read path. I
pointed this out recently and that's why I'm suggesting we clean up the
expectations of Spark SQL operators (see this comment
 and the dev
list thread

).

There are patches out in the Parquet community to do page-level filtering
as well, which will help your use case quite a bit.

rb

On Fri, Jun 15, 2018 at 1:44 AM, Jakub Wozniak 
wrote:

> Hello,
>
> I’m sorry to bother you again but it is quite important for us to
> understand the problem better.
>
> One more finding in our problem is that the performance of queries in a
> timestamp sorted file depend a lot on the predicate timestamp.
> If you are lucky to get some records from the start of the row group it
> might be fast (like seconds). If you search for something that is at the
> end of the row group the query takes minutes.
> I guess this is due to the fact that it has to scan all the previous
> records in the row group until it finds the right ones at the end of it...
>
> Now I have a couple of questions regarding the way the Parquet file is
> read.
>
> 1) Does it always decode the query column (projection from select) even if
> the predicate column does not match (to me it looks like it but I might be
> wrong)?
> 2) Sorting the file will result in “indexed’ row groups so it will be
> easier to locate the which row group to scan but isn’t it at the same time
> limiting parallelism? If the data is randomly placed in the row groups it
> will be searched with as many tasks as we have row groups, right (or at
> least more than 1)? Is there any common rule we can formulate or it is very
> data and/or query dependent?
> 3) Would making a row group smaller (like by half) help? Currently I can
> see that the row groups are about the size of the hdfs block (256MB) but
> sometimes smaller or even bigger.
> We have no settings for the row group so I guess the default hdfs block
> size is used, right?
> Do you have any recommendation / experience with that?
>
> Thanks a lot for your help,
> Jakub
>
>
>
> On 14 Jun 2018, at 12:07, Jakub Wozniak  wrote:
>
> Dear Ryan,
>
> Thanks a lot for your answer.
> After having sent the e-mail we have investigated a bit more the data
> itself.
> It happened that for certain days it was very skewed and one of the row
> groups had much more records that all others.
> This was somehow related to the fact that we have sorted it using our
> object ids and by chance those that went first were smaller (or compressed
> better).
> So the Parquet file had a 6 rows groups where the first one had 300k rows
> and others only 30k rows.
> The search for a given object fell into the first row group and lasted
> very long time.
> The data itself was very much compressed as it contained a lot of zeros.
> To give some numbers the 600MB parquet file expanded to 56GB in JSON.
>
> What we did is to sort the data not by object id but by the record
> timestamp which resulted in much more even data distribution among the row
> groups.
> This in fact helped a lot for the query time (using the timestamp & object
> id)
>
> I have to say that I haven't fully understood this phenomenon yet as I’m
> not a Parquet format & reader expert (at least not yet).
> Maybe it is just a simple function of how many records Spark has to scan
> and the level of parallelism (searching for a given object id when sorted
> by time needs to scan all/more the groups for larger times).
> One question here - is Parquet reader reading & decoding the projection
> columns even if the predicate columns should filter the record out?
>
> Unfortunately we have to have those big columns in the query as people
> want to do analysis on them.
>
> We will continue to investigate…
>
> Cheers,
> 

Re: Jenkins build errors

2018-06-18 Thread shane knapp
i triggered another build against your PR, so let's see if this happens
again or was a transient failure.

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92038/

shane

On Mon, Jun 18, 2018 at 5:30 AM, Petar Zecevic 
wrote:

> Hi,
> Jenkins build for my PR (https://github.com/apache/spark/pull/21109 ;
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92023/
> testReport/org.apache.spark.sql.hive/HiveExternalCatalogVersionsSui
> te/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/) keeps failing.
> First it couldn't download Spark v.2.2.0 (indeed, it wasn't available at
> the mirror it selected), now it's failing with this exception below.
>
> Can someone explain these errors for me? Is anybody else experiencing
> similar problems?
>
> Thanks,
> Petar
>
>
> Error Message
>
> java.io.IOException: Cannot run program "./bin/spark-submit" (in directory
> "/tmp/test-spark/spark-2.2.1"): error=2, No such file or directory
>
> Stacktrace
>
> sbt.ForkMain$ForkError: java.io.IOException: Cannot run program
> "./bin/spark-submit" (in directory "/tmp/test-spark/spark-2.2.1"):
> error=2, No such file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> at org.apache.spark.sql.hive.SparkSubmitTestUtils$class.
> runSparkSubmit(SparkSubmitTestUtils.scala:73)
> at org.apache.spark.sql.hive.HiveExternalCatalogVersionsSui
> te.runSparkSubmit(HiveExternalCatalogVersionsSuite.scala:43)
> at org.apache.spark.sql.hive.HiveExternalCatalogVersionsSui
> te$$anonfun$beforeAll$1.apply(HiveExternalCatalogVersionsSuite.scala:176)
> at org.apache.spark.sql.hive.HiveExternalCatalogVersionsSui
> te$$anonfun$beforeAll$1.apply(HiveExternalCatalogVersionsSuite.scala:161)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.apache.spark.sql.hive.HiveExternalCatalogVersionsSui
> te.beforeAll(HiveExternalCatalogVersionsSuite.scala:161)
> at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(
> BeforeAndAfterAll.scala:212)
> at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
>
> at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
> at org.scalatest.tools.Framework.org$scalatest$tools$Framework$
> $runSuite(Framework.scala:314)
> at 
> org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
>
> at sbt.ForkMain$Run$2.call(ForkMain.java:296)
> at sbt.ForkMain$Run$2.call(ForkMain.java:286)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: sbt.ForkMain$ForkError: java.io.IOException: error=2, No such
> file or directory
> at java.lang.UNIXProcess.forkAndExec(Native Method)
> at java.lang.UNIXProcess.(UNIXProcess.java:248)
> at java.lang.ProcessImpl.start(ProcessImpl.java:134)
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
> ... 17 more
>
>


-- 
Shane Knapp
UC Berkeley EECS Research / RISELab Staff Technical Lead
https://rise.cs.berkeley.edu


[SPARK-24579] SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks

2018-06-18 Thread Xiangrui Meng
Hi all,

I posted a new SPIP on optimized data exchange between Spark and DL/AI
frameworks at SPARK-24579
. It took inputs from
offline conversations with several Spark committers and contributors at
Spark+AI summit conference. Please take a look and let me know your
thoughts in JIRA comments. Thanks!

Best,
Xiangrui
-- 

Xiangrui Meng

Software Engineer

Databricks Inc. [image: http://databricks.com] 


Spark FAIR Scheduler vs FIFO Scheduler

2018-06-18 Thread Alessandro Liparoti
Good morning,

I have a conceptual question. In an application I am working on, when I
write to HDFS some results (*action 1*), I use ~30 executors out of 200. I
would like to improve resource utilization in this case.
I am aware that repartitioning the df to 200 before action 1 would produce
200 tasks and full executors utilization, but for several reasons is not
what I want to do.
What I would like to do is using the other ~170 executors to work on the
actions (jobs) coming after action 1. The normal case would be that *action
2* starts after action 1 (FIFO), but here I want them to start at the same
time, using the idle executors.

My question is: is it something achievable with the FAIR scheduler approach
and if yes how?

As I read the fair scheduler needs a pool of jobs and then it schedules
their tasks in a round-robin fashion. If I submit action 1 and action 2 at
the same time (multi-threading) to a fair pool, which of the following
things happen?

   1. at every moment, all (or almost all) executors are used in parallel
   (30 for action 1, the rest for action 2)
   2. for a certain small amount of time X, 30 executors are used for
   action 1, then for another time X the other executors are used for action
   2, then again X unit of time for action 1 and so on...

Among the two, 1 will actually improve cluster utlization, while 2 will
allow only to have both jobs advancing at the same time. Can someone who
has knowledge about the FAIR scheduler help me understand how it works?

Thanks,
*Alessandro Liparoti*


Jenkins build errors

2018-06-18 Thread Petar Zecevic

Hi,
Jenkins build for my PR (https://github.com/apache/spark/pull/21109 ; 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92023/testReport/org.apache.spark.sql.hive/HiveExternalCatalogVersionsSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/) 
keeps failing. First it couldn't download Spark v.2.2.0 (indeed, it 
wasn't available at the mirror it selected), now it's failing with this 
exception below.


Can someone explain these errors for me? Is anybody else experiencing 
similar problems?


Thanks,
Petar


Error Message

java.io.IOException: Cannot run program "./bin/spark-submit" (in 
directory "/tmp/test-spark/spark-2.2.1"): error=2, No such file or 
directory


Stacktrace

sbt.ForkMain$ForkError: java.io.IOException: Cannot run program 
"./bin/spark-submit" (in directory "/tmp/test-spark/spark-2.2.1"): 
error=2, No such file or directory

    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at 
org.apache.spark.sql.hive.SparkSubmitTestUtils$class.runSparkSubmit(SparkSubmitTestUtils.scala:73)
    at 
org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite.runSparkSubmit(HiveExternalCatalogVersionsSuite.scala:43)
    at 
org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite$$anonfun$beforeAll$1.apply(HiveExternalCatalogVersionsSuite.scala:176)
    at 
org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite$$anonfun$beforeAll$1.apply(HiveExternalCatalogVersionsSuite.scala:161)

    at scala.collection.immutable.List.foreach(List.scala:381)
    at 
org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite.beforeAll(HiveExternalCatalogVersionsSuite.scala:161)
    at 
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:212)
    at 
org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)

    at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
    at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
    at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)

    at sbt.ForkMain$Run$2.call(ForkMain.java:296)
    at sbt.ForkMain$Run$2.call(ForkMain.java:286)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

    at java.lang.Thread.run(Thread.java:745)
Caused by: sbt.ForkMain$ForkError: java.io.IOException: error=2, No such 
file or directory

    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.(UNIXProcess.java:248)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 17 more



Re: Unsubscribe

2018-06-18 Thread Hadrien Chicault
Le ven. 15 juin 2018 à 23:17, Mikhail Dubkov  a
écrit :

> Unsubscribe
>
> On Thu, Jun 14, 2018 at 8:38 PM Kumar S, Sajive 
> wrote:
>
>> Unsubscribe
>>
>