Re: Using existing distribution for join when subset of keys

2020-05-31 Thread Patrick Woody
Hey Terry,

Thanks for the response! I'm not sure that it ends up working though - the
bucketing still seems to require the exchange before the join. Both tables
below are saved bucketed by "x":
*(5) Project [x#29, y#30, z#31, z#37]
+- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner
   :- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0
*   :  +- Exchange hashpartitioning(x#29, y#30, 200)*
   : +- *(1) Project [x#29, y#30, z#31]
   :+- *(1) Filter (isnotnull(x#29) && isnotnull(y#30))
   :   +- *(1) FileScan parquet default.ax[x#29,y#30,z#31] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax],
PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
ReadSchema: struct, SelectedBucketsCount: 200 out of 200
   +- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0
*  +- Exchange hashpartitioning(x#35, y#36, 200)*
 +- *(3) Project [x#35, y#36, z#37]
+- *(3) Filter (isnotnull(x#35) && isnotnull(y#36))
   +- *(3) FileScan parquet default.bx[x#35,y#36,z#37] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx],
PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
ReadSchema: struct, SelectedBucketsCount: 200 out of 200

Best,
Pat



On Sun, May 31, 2020 at 3:15 PM Terry Kim  wrote:

> You can use bucketBy to avoid shuffling in your scenario. This test suite
> has some examples:
> https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343
>
> Thanks,
> Terry
>
> On Sun, May 31, 2020 at 7:43 AM Patrick Woody 
> wrote:
>
>> Hey all,
>>
>> I have one large table, A, and two medium sized tables, B & C, that I'm
>> trying to complete a join on efficiently. The result is multiplicative on A
>> join B, so I'd like to avoid shuffling that result. For this example, let's
>> just assume each table has three columns, x, y, z. The below is all being
>> tested on Spark 2.4.5 locally.
>>
>> I'd like to perform the following join:
>> A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
>> This outputs the following physical plan:
>> == Physical Plan ==
>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>>: +- *(3) Project [x#32, y#33, z#34, z#74]
>>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
>> false, 0
>>:   :  +- Exchange hashpartitioning(x#32, y#33, 200)
>>:   : +- LocalTableScan [x#32, y#33, z#34]
>>:   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
>> false, 0
>>:  +- Exchange hashpartitioning(x#72, y#73, 200)
>>: +- LocalTableScan [x#72, y#73, z#74]
>>+- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>>   +- Exchange hashpartitioning(x#52, z#54, 200)
>>  +- LocalTableScan [x#52, y#53, z#54]
>>
>>
>> I may be misremembering, but in the past I thought you had the ability to
>> pre-partition each table by "x" and it would satisfy the requirements of
>> the join since it is already clustered by the key on both sides using the
>> same hash function (this assumes numPartitions lines up obviously). However
>> it seems like it will insert another exchange:
>>
>> A.repartition($"x").join(B.repartition($"x"), Seq("x",
>> "y")).join(C.repartition($"x"), Seq("x", "z"))
>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>>: +- *(3) Project [x#32, y#33, z#34, z#74]
>>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
>> false, 0
>>:   :  +- Exchange hashpartitioning(x#32, y#33, 200)
>>:   : +- Exchange hashpartitioning(x#32, 200)
>>:   :+- LocalTableScan [x#32, y#33, z#34]
>>:   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
>> false, 0
&g

Using existing distribution for join when subset of keys

2020-05-31 Thread Patrick Woody
Hey all,

I have one large table, A, and two medium sized tables, B & C, that I'm
trying to complete a join on efficiently. The result is multiplicative on A
join B, so I'd like to avoid shuffling that result. For this example, let's
just assume each table has three columns, x, y, z. The below is all being
tested on Spark 2.4.5 locally.

I'd like to perform the following join:
A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
This outputs the following physical plan:
== Physical Plan ==
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   : +- *(3) Project [x#32, y#33, z#34, z#74]
   :+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
false, 0
   :   :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :   : +- LocalTableScan [x#32, y#33, z#34]
   :   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
false, 0
   :  +- Exchange hashpartitioning(x#72, y#73, 200)
   : +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(x#52, z#54, 200)
 +- LocalTableScan [x#52, y#53, z#54]


I may be misremembering, but in the past I thought you had the ability to
pre-partition each table by "x" and it would satisfy the requirements of
the join since it is already clustered by the key on both sides using the
same hash function (this assumes numPartitions lines up obviously). However
it seems like it will insert another exchange:

A.repartition($"x").join(B.repartition($"x"), Seq("x",
"y")).join(C.repartition($"x"), Seq("x", "z"))
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   : +- *(3) Project [x#32, y#33, z#34, z#74]
   :+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
false, 0
   :   :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :   : +- Exchange hashpartitioning(x#32, 200)
   :   :+- LocalTableScan [x#32, y#33, z#34]
   :   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
false, 0
   :  +- Exchange hashpartitioning(x#72, y#73, 200)
   : +- Exchange hashpartitioning(x#72, 200)
   :+- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(x#52, z#54, 200)
 +- ReusedExchange [x#52, y#53, z#54], Exchange
hashpartitioning(x#32, 200).

Note, that using this "strategy" with groupBy("x", "y") works fine though I
assume that is because it doesn't have to consider the other side of the
join.

Did this used to work or am I simply confusing it with groupBy? Either way
- any thoughts on how I can avoid shuffling the bulk of the join result?

Thanks,
Pat


Spark 1.6.2 short circuit AND filter broken

2016-07-07 Thread Patrick Woody
Hey all,

I hit a pretty nasty bug on 1.6.2 that I can't reproduce on 2.0. Here is
the code/logical plan http://pastebin.com/ULnHd1b6. I have filterPushdown
disabled, so when I call collect here it hits the Exception in my UDF
before doing a null check on the input.

I believe it is a symptom of how the DataSourceStrategy splits the
predicates and recombines since it happens at the Physical planning, but I
haven't gone deeper. This doesn't reproduce if I simply use case classes
and sqlContext.createDataFrame.

Is there going to be a 1.6.3 release where this bug can be fixed? I'm happy
to dig further and send up a PR.

Thanks!
-Pat


Get Spark version before starting context

2015-07-04 Thread Patrick Woody
Hey all,

Is it possible to reliably get the version string of a Spark cluster prior
to trying to connect via the SparkContext on the client side? Most of the
errors I've seen on mismatched versions have been cryptic, so it would be
helpful if I could throw an exception earlier.

I know it is contained the HTML of the master, but an API point would also
be helpful. Does this exist?

Thanks!
-Pat


Re: Get Spark version before starting context

2015-07-04 Thread Patrick Woody
To somewhat answer my own question - it looks like an empty request to the
rest API will throw an error which returns the version in JSON as well.
Still not ideal though. Would there be any objection to adding a simple
version endpoint to the API?

On Sat, Jul 4, 2015 at 4:00 PM, Patrick Woody patrick.woo...@gmail.com
wrote:

 Hey all,

 Is it possible to reliably get the version string of a Spark cluster prior
 to trying to connect via the SparkContext on the client side? Most of the
 errors I've seen on mismatched versions have been cryptic, so it would be
 helpful if I could throw an exception earlier.

 I know it is contained the HTML of the master, but an API point would also
 be helpful. Does this exist?

 Thanks!
 -Pat



Re: Dynamic allocator requests -1 executors

2015-06-13 Thread Patrick Woody
Hey Sandy,

I'll test it out on 1.4. Do you have a bug number or PR that I could reference 
as well?

Thanks!
-Pat

Sent from my iPhone

 On Jun 13, 2015, at 11:38 AM, Sandy Ryza sandy.r...@cloudera.com wrote:
 
 Hi Patrick,
 
 I'm noticing that you're using Spark 1.3.1.  We fixed a bug in dynamic 
 allocation in 1.4 that permitted requesting negative numbers of executors.  
 Any chance you'd be able to try with the newer version and see if the problem 
 persists?
 
 -Sandy
 
 On Fri, Jun 12, 2015 at 7:42 PM, Patrick Woody patrick.woo...@gmail.com 
 wrote:
 Hey all,
 
 I've recently run into an issue where spark dynamicAllocation has asked for 
 -1 executors from YARN. Unfortunately, this raises an exception that kills 
 the executor-allocation thread and the application can't request more 
 resources.
 
 Has anyone seen this before? It is spurious and the application usually 
 works, but when this gets hit it becomes unusable when getting stuck at 
 minimum YARN resources.
 
 Stacktrace below.
 
 Thanks!
 -Pat
 
 470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught 
 exception in thread spark-dynamic-executor-allocation-0
 471 ! java.lang.IllegalArgumentException: Attempted to request a negative 
 number of executor(s) -1 from the cluster manager. Please specify a positive 
 number!
 472 ! at 
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338)
  ~[spark-core_2.10-1.3.1.jar:1.
 473 ! at 
 org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137) 
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
 474 ! at 
 org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
  ~[spark-core_2.10-1.3.1.jar:1.3.1]
 475 ! at 
 org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
  ~[spark-core_2.10-1.3.1.jar:1.3.1]
 476 ! at 
 org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
  ~[spark-core_2.10-1.3.1.j
 477 ! at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
  ~[spark-core_2.10-1.3.1.jar:1.3.1]
 478 ! at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
  ~[spark-core_2.10-1.3.1.jar:1.3.1]
 479 ! at 
 org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
  ~[spark-core_2.10-1.3.1.jar:1.3.1]
 480 ! at 
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) 
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
 481 ! at 
 org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
  [spark-core_2.10-1.3.1.jar:1.3.1]
 482 ! at 
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
 [na:1.7.0_71]
 483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) 
 [na:1.7.0_71]
 484 ! at 
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
  [na:1.7.0_71]
 485 ! at 
 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  [na:1.7.0_71]
 486 ! at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  [na:1.7.0_71]
 487 ! at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  [na:1.7.0_71]
 


Dynamic allocator requests -1 executors

2015-06-12 Thread Patrick Woody
Hey all,

I've recently run into an issue where spark dynamicAllocation has asked for
-1 executors from YARN. Unfortunately, this raises an exception that kills
the executor-allocation thread and the application can't request more
resources.

Has anyone seen this before? It is spurious and the application usually
works, but when this gets hit it becomes unusable when getting stuck at
minimum YARN resources.

Stacktrace below.

Thanks!
-Pat

470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught
exception in thread spark-dynamic-executor-allocation-0
471 ! java.lang.IllegalArgumentException: Attempted to request a negative
number of executor(s) -1 from the cluster manager. Please specify a
positive number!
472 ! at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338)
~[spark-core_2.10-1.3.1.jar:1.
473 ! at
org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137)
~[spark-core_2.10-1.3.1.jar:1.3.1]
474 ! at
org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294)
~[spark-core_2.10-1.3.1.jar:1.3.1]
475 ! at
org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263)
~[spark-core_2.10-1.3.1.jar:1.3.1]
476 ! at 
org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230)
~[spark-core_2.10-1.3.1.j
477 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
478 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
479 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189)
~[spark-core_2.10-1.3.1.jar:1.3.1]
480 ! at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
~[spark-core_2.10-1.3.1.jar:1.3.1]
481 ! at
org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189)
[spark-core_2.10-1.3.1.jar:1.3.1]
482 ! at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
[na:1.7.0_71]
483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
[na:1.7.0_71]
484 ! at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
[na:1.7.0_71]
485 ! at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[na:1.7.0_71]
486 ! at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71]
487 ! at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71]


Re: SparkSQL UDTs with Ordering

2015-03-24 Thread Patrick Woody
Awesome. yep - I have seen the warnings on UDTs, happy to keep up with the
API changes :). Would this be a reasonable PR to toss up despite the API
unstableness or would you prefer it to wait?

Thanks
-Pat

On Tue, Mar 24, 2015 at 7:44 PM, Michael Armbrust mich...@databricks.com
wrote:

 I'll caution that the UDTs are not a stable public interface yet.  We'd
 like to do this someday, but currently this feature is mostly for MLlib as
 we have not finalized the API.

 Having an ordering could be useful, but I'll add that currently UDTs
 actually exist in serialized from so the ordering would have to be on the
 internal form, not the user visible form.

 On Tue, Mar 24, 2015 at 12:25 PM, Patrick Woody patrick.woo...@gmail.com
 wrote:

 Hey all,

 Currently looking into UDTs and I was wondering if it is reasonable to
 add the ability to define an Ordering (or if this is possible, then how)?
 Currently it will throw an error when non-Native types are used.

 Thanks!
 -Pat





SparkSQL UDTs with Ordering

2015-03-24 Thread Patrick Woody
Hey all,

Currently looking into UDTs and I was wondering if it is reasonable to add
the ability to define an Ordering (or if this is possible, then how)?
Currently it will throw an error when non-Native types are used.

Thanks!
-Pat