Re: Using existing distribution for join when subset of keys
Hey Terry, Thanks for the response! I'm not sure that it ends up working though - the bucketing still seems to require the exchange before the join. Both tables below are saved bucketed by "x": *(5) Project [x#29, y#30, z#31, z#37] +- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner :- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0 * : +- Exchange hashpartitioning(x#29, y#30, 200)* : +- *(1) Project [x#29, y#30, z#31] :+- *(1) Filter (isnotnull(x#29) && isnotnull(y#30)) : +- *(1) FileScan parquet default.ax[x#29,y#30,z#31] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200 +- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0 * +- Exchange hashpartitioning(x#35, y#36, 200)* +- *(3) Project [x#35, y#36, z#37] +- *(3) Filter (isnotnull(x#35) && isnotnull(y#36)) +- *(3) FileScan parquet default.bx[x#35,y#36,z#37] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx], PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200 Best, Pat On Sun, May 31, 2020 at 3:15 PM Terry Kim wrote: > You can use bucketBy to avoid shuffling in your scenario. This test suite > has some examples: > https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343 > > Thanks, > Terry > > On Sun, May 31, 2020 at 7:43 AM Patrick Woody > wrote: > >> Hey all, >> >> I have one large table, A, and two medium sized tables, B & C, that I'm >> trying to complete a join on efficiently. The result is multiplicative on A >> join B, so I'd like to avoid shuffling that result. For this example, let's >> just assume each table has three columns, x, y, z. The below is all being >> tested on Spark 2.4.5 locally. >> >> I'd like to perform the following join: >> A.join(B, Seq("x", "y")).join(C, Seq("x", "z")) >> This outputs the following physical plan: >> == Physical Plan == >> *(6) Project [x#32, z#34, y#33, z#74, y#53] >> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner >>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0 >>: +- Exchange hashpartitioning(x#32, z#34, 200) >>: +- *(3) Project [x#32, y#33, z#34, z#74] >>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner >>: :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], >> false, 0 >>: : +- Exchange hashpartitioning(x#32, y#33, 200) >>: : +- LocalTableScan [x#32, y#33, z#34] >>: +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], >> false, 0 >>: +- Exchange hashpartitioning(x#72, y#73, 200) >>: +- LocalTableScan [x#72, y#73, z#74] >>+- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0 >> +- Exchange hashpartitioning(x#52, z#54, 200) >> +- LocalTableScan [x#52, y#53, z#54] >> >> >> I may be misremembering, but in the past I thought you had the ability to >> pre-partition each table by "x" and it would satisfy the requirements of >> the join since it is already clustered by the key on both sides using the >> same hash function (this assumes numPartitions lines up obviously). However >> it seems like it will insert another exchange: >> >> A.repartition($"x").join(B.repartition($"x"), Seq("x", >> "y")).join(C.repartition($"x"), Seq("x", "z")) >> *(6) Project [x#32, z#34, y#33, z#74, y#53] >> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner >>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0 >>: +- Exchange hashpartitioning(x#32, z#34, 200) >>: +- *(3) Project [x#32, y#33, z#34, z#74] >>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner >>: :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], >> false, 0 >>: : +- Exchange hashpartitioning(x#32, y#33, 200) >>: : +- Exchange hashpartitioning(x#32, 200) >>: :+- LocalTableScan [x#32, y#33, z#34] >>: +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], >> false, 0 &g
Using existing distribution for join when subset of keys
Hey all, I have one large table, A, and two medium sized tables, B & C, that I'm trying to complete a join on efficiently. The result is multiplicative on A join B, so I'd like to avoid shuffling that result. For this example, let's just assume each table has three columns, x, y, z. The below is all being tested on Spark 2.4.5 locally. I'd like to perform the following join: A.join(B, Seq("x", "y")).join(C, Seq("x", "z")) This outputs the following physical plan: == Physical Plan == *(6) Project [x#32, z#34, y#33, z#74, y#53] +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(x#32, z#34, 200) : +- *(3) Project [x#32, y#33, z#34, z#74] :+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner : :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(x#32, y#33, 200) : : +- LocalTableScan [x#32, y#33, z#34] : +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(x#72, y#73, 200) : +- LocalTableScan [x#72, y#73, z#74] +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(x#52, z#54, 200) +- LocalTableScan [x#52, y#53, z#54] I may be misremembering, but in the past I thought you had the ability to pre-partition each table by "x" and it would satisfy the requirements of the join since it is already clustered by the key on both sides using the same hash function (this assumes numPartitions lines up obviously). However it seems like it will insert another exchange: A.repartition($"x").join(B.repartition($"x"), Seq("x", "y")).join(C.repartition($"x"), Seq("x", "z")) *(6) Project [x#32, z#34, y#33, z#74, y#53] +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(x#32, z#34, 200) : +- *(3) Project [x#32, y#33, z#34, z#74] :+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner : :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(x#32, y#33, 200) : : +- Exchange hashpartitioning(x#32, 200) : :+- LocalTableScan [x#32, y#33, z#34] : +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(x#72, y#73, 200) : +- Exchange hashpartitioning(x#72, 200) :+- LocalTableScan [x#72, y#73, z#74] +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(x#52, z#54, 200) +- ReusedExchange [x#52, y#53, z#54], Exchange hashpartitioning(x#32, 200). Note, that using this "strategy" with groupBy("x", "y") works fine though I assume that is because it doesn't have to consider the other side of the join. Did this used to work or am I simply confusing it with groupBy? Either way - any thoughts on how I can avoid shuffling the bulk of the join result? Thanks, Pat
Spark 1.6.2 short circuit AND filter broken
Hey all, I hit a pretty nasty bug on 1.6.2 that I can't reproduce on 2.0. Here is the code/logical plan http://pastebin.com/ULnHd1b6. I have filterPushdown disabled, so when I call collect here it hits the Exception in my UDF before doing a null check on the input. I believe it is a symptom of how the DataSourceStrategy splits the predicates and recombines since it happens at the Physical planning, but I haven't gone deeper. This doesn't reproduce if I simply use case classes and sqlContext.createDataFrame. Is there going to be a 1.6.3 release where this bug can be fixed? I'm happy to dig further and send up a PR. Thanks! -Pat
Get Spark version before starting context
Hey all, Is it possible to reliably get the version string of a Spark cluster prior to trying to connect via the SparkContext on the client side? Most of the errors I've seen on mismatched versions have been cryptic, so it would be helpful if I could throw an exception earlier. I know it is contained the HTML of the master, but an API point would also be helpful. Does this exist? Thanks! -Pat
Re: Get Spark version before starting context
To somewhat answer my own question - it looks like an empty request to the rest API will throw an error which returns the version in JSON as well. Still not ideal though. Would there be any objection to adding a simple version endpoint to the API? On Sat, Jul 4, 2015 at 4:00 PM, Patrick Woody patrick.woo...@gmail.com wrote: Hey all, Is it possible to reliably get the version string of a Spark cluster prior to trying to connect via the SparkContext on the client side? Most of the errors I've seen on mismatched versions have been cryptic, so it would be helpful if I could throw an exception earlier. I know it is contained the HTML of the master, but an API point would also be helpful. Does this exist? Thanks! -Pat
Re: Dynamic allocator requests -1 executors
Hey Sandy, I'll test it out on 1.4. Do you have a bug number or PR that I could reference as well? Thanks! -Pat Sent from my iPhone On Jun 13, 2015, at 11:38 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Patrick, I'm noticing that you're using Spark 1.3.1. We fixed a bug in dynamic allocation in 1.4 that permitted requesting negative numbers of executors. Any chance you'd be able to try with the newer version and see if the problem persists? -Sandy On Fri, Jun 12, 2015 at 7:42 PM, Patrick Woody patrick.woo...@gmail.com wrote: Hey all, I've recently run into an issue where spark dynamicAllocation has asked for -1 executors from YARN. Unfortunately, this raises an exception that kills the executor-allocation thread and the application can't request more resources. Has anyone seen this before? It is spurious and the application usually works, but when this gets hit it becomes unusable when getting stuck at minimum YARN resources. Stacktrace below. Thanks! -Pat 470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught exception in thread spark-dynamic-executor-allocation-0 471 ! java.lang.IllegalArgumentException: Attempted to request a negative number of executor(s) -1 from the cluster manager. Please specify a positive number! 472 ! at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338) ~[spark-core_2.10-1.3.1.jar:1. 473 ! at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137) ~[spark-core_2.10-1.3.1.jar:1.3.1] 474 ! at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294) ~[spark-core_2.10-1.3.1.jar:1.3.1] 475 ! at org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263) ~[spark-core_2.10-1.3.1.jar:1.3.1] 476 ! at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230) ~[spark-core_2.10-1.3.1.j 477 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 478 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 479 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 480 ! at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) ~[spark-core_2.10-1.3.1.jar:1.3.1] 481 ! at org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189) [spark-core_2.10-1.3.1.jar:1.3.1] 482 ! at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_71] 483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_71] 484 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_71] 485 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_71] 486 ! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] 487 ! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
Dynamic allocator requests -1 executors
Hey all, I've recently run into an issue where spark dynamicAllocation has asked for -1 executors from YARN. Unfortunately, this raises an exception that kills the executor-allocation thread and the application can't request more resources. Has anyone seen this before? It is spurious and the application usually works, but when this gets hit it becomes unusable when getting stuck at minimum YARN resources. Stacktrace below. Thanks! -Pat 470 ERROR [2015-06-12 16:44:39,724] org.apache.spark.util.Utils: Uncaught exception in thread spark-dynamic-executor-allocation-0 471 ! java.lang.IllegalArgumentException: Attempted to request a negative number of executor(s) -1 from the cluster manager. Please specify a positive number! 472 ! at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:338) ~[spark-core_2.10-1.3.1.jar:1. 473 ! at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1137) ~[spark-core_2.10-1.3.1.jar:1.3.1] 474 ! at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:294) ~[spark-core_2.10-1.3.1.jar:1.3.1] 475 ! at org.apache.spark.ExecutorAllocationManager.addOrCancelExecutorRequests(ExecutorAllocationManager.scala:263) ~[spark-core_2.10-1.3.1.jar:1.3.1] 476 ! at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:230) ~[spark-core_2.10-1.3.1.j 477 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply$mcV$sp(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 478 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 479 ! at org.apache.spark.ExecutorAllocationManager$$anon$1$$anonfun$run$1.apply(ExecutorAllocationManager.scala:189) ~[spark-core_2.10-1.3.1.jar:1.3.1] 480 ! at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) ~[spark-core_2.10-1.3.1.jar:1.3.1] 481 ! at org.apache.spark.ExecutorAllocationManager$$anon$1.run(ExecutorAllocationManager.scala:189) [spark-core_2.10-1.3.1.jar:1.3.1] 482 ! at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_71] 483 ! at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_71] 484 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_71] 485 ! at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_71] 486 ! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] 487 ! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
Re: SparkSQL UDTs with Ordering
Awesome. yep - I have seen the warnings on UDTs, happy to keep up with the API changes :). Would this be a reasonable PR to toss up despite the API unstableness or would you prefer it to wait? Thanks -Pat On Tue, Mar 24, 2015 at 7:44 PM, Michael Armbrust mich...@databricks.com wrote: I'll caution that the UDTs are not a stable public interface yet. We'd like to do this someday, but currently this feature is mostly for MLlib as we have not finalized the API. Having an ordering could be useful, but I'll add that currently UDTs actually exist in serialized from so the ordering would have to be on the internal form, not the user visible form. On Tue, Mar 24, 2015 at 12:25 PM, Patrick Woody patrick.woo...@gmail.com wrote: Hey all, Currently looking into UDTs and I was wondering if it is reasonable to add the ability to define an Ordering (or if this is possible, then how)? Currently it will throw an error when non-Native types are used. Thanks! -Pat
SparkSQL UDTs with Ordering
Hey all, Currently looking into UDTs and I was wondering if it is reasonable to add the ability to define an Ordering (or if this is possible, then how)? Currently it will throw an error when non-Native types are used. Thanks! -Pat