[jira] [Commented] (SPARK-45792) SPIP: ShuffleManager short name registration via SparkPlugin

2023-11-04 Thread Alessandro Bellina (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17782931#comment-17782931
 ] 

Alessandro Bellina commented on SPARK-45792:


Added a link to an issue that needs to be solved in order for this SPIP to move 
forward.

> SPIP: ShuffleManager short name registration via SparkPlugin
> 
>
> Key: SPARK-45792
> URL: https://issues.apache.org/jira/browse/SPARK-45792
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Alessandro Bellina
>Priority: Minor
> Fix For: 4.0.0
>
>
> We would like to make it possible for a SparkPlugin to expose custom 
> ShuffleManager implementations using short names, in addition to default 
> configurations for each short name in order to improve ease of use. 
> Today, users leveraging a spark plugin need to set a class name under 
> spark.plugins and if the plugin also provides a ShuffleManager, the user 
> needs to set spark.shuffle.manager to a fully qualified class name. 
> Additionally, users need to make sure they have set Spark configurations that 
> are required or recommended for the shuffle manager instance. This can be 
> cumbersome for the user, adding barriers to the pluggable interface that 
> Spark provides.
> Spark provides a short name for SortShuffleManager (“sort”) today. The idea 
> in this SPIP is to make this set of short names extensible by a SparkPlugin.
> SPIP: 
> https://docs.google.com/document/d/1flijDjMMAAGh2C2k-vg1u651RItaRquLGB_sVudxf6I/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45792) SPIP: ShuffleManager short name registration via SparkPlugin

2023-11-04 Thread Alessandro Bellina (Jira)
Alessandro Bellina created SPARK-45792:
--

 Summary: SPIP: ShuffleManager short name registration via 
SparkPlugin
 Key: SPARK-45792
 URL: https://issues.apache.org/jira/browse/SPARK-45792
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 4.0.0
Reporter: Alessandro Bellina
 Fix For: 4.0.0


We would like to make it possible for a SparkPlugin to expose custom 
ShuffleManager implementations using short names, in addition to default 
configurations for each short name in order to improve ease of use. 

Today, users leveraging a spark plugin need to set a class name under 
spark.plugins and if the plugin also provides a ShuffleManager, the user needs 
to set spark.shuffle.manager to a fully qualified class name. Additionally, 
users need to make sure they have set Spark configurations that are required or 
recommended for the shuffle manager instance. This can be cumbersome for the 
user, adding barriers to the pluggable interface that Spark provides.

Spark provides a short name for SortShuffleManager (“sort”) today. The idea in 
this SPIP is to make this set of short names extensible by a SparkPlugin.

SPIP: 
https://docs.google.com/document/d/1flijDjMMAAGh2C2k-vg1u651RItaRquLGB_sVudxf6I/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45762) Shuffle managers defined in user jars are not available for some launch modes

2023-11-01 Thread Alessandro Bellina (Jira)
Alessandro Bellina created SPARK-45762:
--

 Summary: Shuffle managers defined in user jars are not available 
for some launch modes
 Key: SPARK-45762
 URL: https://issues.apache.org/jira/browse/SPARK-45762
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.5.0
Reporter: Alessandro Bellina
 Fix For: 4.0.0


Starting a spark job in standalone mode with a custom `ShuffleManager` provided 
in a jar via `--jars` does not work. This can also be experienced in 
local-cluster mode.

The approach that works consistently is to copy the jar containing the custom 
`ShuffleManager` to a specific location in each node then add it to 
`spark.driver.extraClassPath` and `spark.executor.extraClassPath`, but we would 
like to move away from setting extra configurations unnecessarily.

Example:
{code:java}
$SPARK_HOME/bin/spark-shell \
  --master spark://127.0.0.1:7077 \
  --conf spark.shuffle.manager=org.apache.spark.examples.TestShuffleManager \
  --jars user-code.jar
{code}
This yields `java.lang.ClassNotFoundException` in the executors.
{code:java}
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
  at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1915)
  at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
  at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:436)
  at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:425)
  at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.lang.ClassNotFoundException: 
org.apache.spark.examples.TestShuffleManager
  at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
  at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
  at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
  at java.base/java.lang.Class.forName0(Native Method)
  at java.base/java.lang.Class.forName(Class.java:467)
  at 
org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:41)
  at 
org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:36)
  at org.apache.spark.util.Utils$.classForName(Utils.scala:95)
  at 
org.apache.spark.util.Utils$.instantiateSerializerOrShuffleManager(Utils.scala:2574)
  at org.apache.spark.SparkEnv$.create(SparkEnv.scala:366)
  at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:255)
  at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:487)
  at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
  at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
  at 
java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
  at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
  at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
  ... 4 more
{code}
We can change our command to use `extraClassPath`:
{code:java}
$SPARK_HOME/bin/spark-shell \
  --master spark://127.0.0.1:7077 \
  --conf spark.shuffle.manager=org.apache.spark.examples.TestShuffleManager \
  --conf spark.driver.extraClassPath=user-code.jar \
 --conf spark.executor.extraClassPath=user-code.jar
{code}
Success after adding the jar to `extraClassPath`:
{code:java}
23/10/26 12:58:26 INFO TransportClientFactory: Successfully created connection 
to localhost/127.0.0.1:33053 after 7 ms (0 ms spent in bootstraps)
23/10/26 12:58:26 WARN TestShuffleManager: Instantiated TestShuffleManager!!
23/10/26 12:58:26 INFO DiskBlockManager: Created local directory at 
/tmp/spark-cb101b05-c4b7-4ba9-8b3d-5b23baa7cb46/executor-5d5335dd-c116-4211-9691-87d8566017fd/blockmgr-2fcb1ab2-d886--8c7f-9dca2c880c2c
{code}
We would like to change startup order such that the original command succeeds, 
without specifying `extraClassPath`:
{code:java}
$SPARK_HOME/bin/spark-shell \
  --master spark://127.0.0.1:7077 \
  --conf spark.shuffle.manager=org.apache.spark.examples.TestShuffleManager \
  --jars user-code.jar
{code}
Proposed changes:

Refactor code so we initialize the `ShuffleManager` later, after jars have been 
localized. This is especially necessary in the executor, where we would need to 
move this initialization until after the `replClassLoader` is updated with jars 
passed in `--jars`.

Today, the `ShuffleManager` is instantiated at `SparkEnv` creation. Having to 
instantiate the `ShuffleManager` this early doesn't work, because user jars 
have not been localized in all scenarios, and we will fail to load the 
`ShuffleManager`. We propose moving the `ShuffleManager` instantiation to 
`SparkContext` on the driver, and Executor, 

[jira] [Commented] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred

2022-05-10 Thread Alessandro Bellina (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534362#comment-17534362
 ] 

Alessandro Bellina commented on SPARK-39131:


I'll take a crack at a patch today that can be pull requested for further 
discussion.

> "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred
> --
>
> Key: SPARK-39131
> URL: https://issues.apache.org/jira/browse/SPARK-39131
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 3.2.1
>Reporter: Alessandro Bellina
>Priority: Major
>
> We would like to propose a slight change in the order of execution of logical 
> plan optimizer rules given a performance issue we have seen with {{LeftSemi}} 
> being materialized too late in the logical plan optimizer, and not benefiting 
> from the null filtering that {{InferFiltersFromConstraints}} can insert.
> I have "something that works" locally (see rest of the description for info 
> and a diff), but given that this is the optimizer it is not clear what else I 
> could be breaking, so I'd like to hear from the experts on whether this is 
> the right change.
> The query in question is based on TPCDS query16 which originally has an 
> {{exists}} filter: 
> {code:sql}
> …
> and exists (select *
> from catalog_sales cs2
> where cs1.cs_order_number = cs2.cs_order_number 
>   and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
> …
> {code}
> The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join 
> like so:
> {code:sql}
> +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], 
> LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869)
>  :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
> ENSURE_REQUIREMENTS, [id=#364]
>  : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND 
> isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11))
>  :+- *(1) ColumnarToRow
>  :   +- FileScan parquet [...] Batched: true, DataFilters: 
> [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
> isnotnull(cs_call_center_sk#11)],..., PushedFilters: 
> [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
> IsNotNull(cs_call_center_sk)], ReadSchema: ...
>  +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(cs_order_number#872L, 200), 
> ENSURE_REQUIREMENTS, [id=#372]
>+- *(3) ColumnarToRow
>   +- FileScan parquet [...] Batched: true, DataFilters: [], ..., 
> PushedFilters: [], ReadSchema: ...
> {code}
> Note that the {{LeftSemi}} key and condition are not being null filtered on 
> the stream side, and the build side has no filter at all. We have found that 
> as the dataset size increases, this can become an issue, and in our case, it 
> was many nulls that will not match. We would like to remove the unnecessary 
> rows early at the scan and filter phases.
> The change we made allows the join key and the condition to be added to the 
> stream side filter, and for the build side filter to get added:
> {code:sql}
> +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], 
> LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940)
>:- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
> ENSURE_REQUIREMENTS, [id=#759]
>: +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND 
> isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND 
> isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14))
>:+- *(1) ColumnarToRow
>:   +- FileScan parquet ..., DataFilters: 
> [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
> isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: 
> [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
> IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ...
>+- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(cs_order_number#943L, 200), 
> ENSURE_REQUIREMENTS, [id=#768]
>  +- *(3) Filter (isnotnull(cs_order_number#943L) AND 
> isnotnull(cs_warehouse_sk#940))
> +- *(3) ColumnarToRow
>+- FileScan parquet ..., DataFilters: 
> [isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., 
> PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), 
> IsNotNull(cs_warehouse_sk)], ReadSchema: ... 
> {code}
> This issue can be boiled down to this simple repro:
> {code:java}
> sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else 
> 

[jira] [Updated] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred

2022-05-09 Thread Alessandro Bellina (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alessandro Bellina updated SPARK-39131:
---
Description: 
We would like to propose a slight change in the order of execution of logical 
plan optimizer rules given a performance issue we have seen with {{LeftSemi}} 
being materialized too late in the logical plan optimizer, and not benefiting 
from the null filtering that {{InferFiltersFromConstraints}} can insert.

I have "something that works" locally (see rest of the description for info and 
a diff), but given that this is the optimizer it is not clear what else I could 
be breaking, so I'd like to hear from the experts on whether this is the right 
change.

The query in question is based on TPCDS query16 which originally has an 
{{exists}} filter: 

{code:sql}
…
and exists (select *
from catalog_sales cs2
where cs1.cs_order_number = cs2.cs_order_number 
  and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
…
{code}

The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join 
like so:

{code:sql}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], LeftSemi, 
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869)
 :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
ENSURE_REQUIREMENTS, [id=#364]
 : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND 
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11))
 :+- *(1) ColumnarToRow
 :   +- FileScan parquet [...] Batched: true, DataFilters: 
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
isnotnull(cs_call_center_sk#11)],..., PushedFilters: 
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
IsNotNull(cs_call_center_sk)], ReadSchema: ...
 +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cs_order_number#872L, 200), 
ENSURE_REQUIREMENTS, [id=#372]
   +- *(3) ColumnarToRow
  +- FileScan parquet [...] Batched: true, DataFilters: [], ..., 
PushedFilters: [], ReadSchema: ...
{code}

Note that the {{LeftSemi}} key and condition are not being null filtered on the 
stream side, and the build side has no filter at all. We have found that as the 
dataset size increases, this can become an issue, and in our case, it was many 
nulls that will not match. We would like to remove the unnecessary rows early 
at the scan and filter phases.

The change we made allows the join key and the condition to be added to the 
stream side filter, and for the build side filter to get added:

{code:sql}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], LeftSemi, 
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940)
   :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
ENSURE_REQUIREMENTS, [id=#759]
   : +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND 
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND 
isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14))
   :+- *(1) ColumnarToRow
   :   +- FileScan parquet ..., DataFilters: 
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: 
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ...
   +- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(cs_order_number#943L, 200), 
ENSURE_REQUIREMENTS, [id=#768]
 +- *(3) Filter (isnotnull(cs_order_number#943L) AND 
isnotnull(cs_warehouse_sk#940))
+- *(3) ColumnarToRow
   +- FileScan parquet ..., DataFilters: 
[isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., 
PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), 
IsNotNull(cs_warehouse_sk)], ReadSchema: ... 
{code}

This issue can be boiled down to this simple repro:

{code:java}
sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else 
{Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table")
spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table")
spark.sql("select * from my_table t1 where exists(select * from my_table  t2 
where t2.value = t1.value)").explain(true)
{code}

Which produces a similar plan, with a {{LeftSemi}} and no filters:

{code:sql}
== Physical Plan ==
*(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false
:- *(2) ColumnarToRow
:  +- FileScan parquet [value#19] Batched: true, DataFilters: [], Format: 
Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 

[jira] [Updated] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred

2022-05-09 Thread Alessandro Bellina (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alessandro Bellina updated SPARK-39131:
---
Description: 
We would like to propose a slight change in the order of execution of logical 
plan optimizer rules given a performance issue we have seen with {{LeftSemi}} 
being materialized too late in the logical plan optimizer, and not benefiting 
from the null filtering that {{InferFiltersFromConstraints}} can insert.

I have "something that works" locally (see rest of the description for info and 
a diff), but given that this is the optimizer it is not clear what else I could 
be breaking, so I'd like to hear from the experts on whether this is the right 
change.

The query in question is based on TPCDS query16 which originally has an 
{{exists}} filter: 

{code:sql}
…
and exists (select *
from catalog_sales cs2
where cs1.cs_order_number = cs2.cs_order_number 
  and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
…
{code}

The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join 
like so:

{code:sql}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], LeftSemi, 
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869)
 :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
ENSURE_REQUIREMENTS, [id=#364]
 : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND 
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11))
 :+- *(1) ColumnarToRow
 :   +- FileScan parquet [...] Batched: true, DataFilters: 
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
isnotnull(cs_call_center_sk#11)],..., PushedFilters: 
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
IsNotNull(cs_call_center_sk)], ReadSchema: ...
 +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cs_order_number#872L, 200), 
ENSURE_REQUIREMENTS, [id=#372]
   +- *(3) ColumnarToRow
  +- FileScan parquet [...] Batched: true, DataFilters: [], ..., 
PushedFilters: [], ReadSchema: ...
{code}

Note that the {{LeftSemi}} key and condition are not being null filtered on the 
stream side, and the build side has not filter at all. We have found that as 
the dataset size increases, this can become an issue, and in our case, it was 
many nulls that will not match. We would like to remove the unnecessary rows 
early at the scan and filter phases.

The change we made allows the join key and the condition to be added to the 
stream side filter, and for the build side filter to get added:

{code:sql}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], LeftSemi, 
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940)
   :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
ENSURE_REQUIREMENTS, [id=#759]
   : +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND 
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND 
isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14))
   :+- *(1) ColumnarToRow
   :   +- FileScan parquet ..., DataFilters: 
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: 
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ...
   +- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(cs_order_number#943L, 200), 
ENSURE_REQUIREMENTS, [id=#768]
 +- *(3) Filter (isnotnull(cs_order_number#943L) AND 
isnotnull(cs_warehouse_sk#940))
+- *(3) ColumnarToRow
   +- FileScan parquet ..., DataFilters: 
[isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., 
PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), 
IsNotNull(cs_warehouse_sk)], ReadSchema: ... 
{code}

This issue can be boiled down to this simple repro:

{code:java}
sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else 
{Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table")
spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table")
spark.sql("select * from my_table t1 where exists(select * from my_table  t2 
where t2.value = t1.value)").explain(true)
{code}

Which produces a similar plan, with a {{LeftSemi}} and no filters:

{code:sql}
== Physical Plan ==
*(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false
:- *(2) ColumnarToRow
:  +- FileScan parquet [value#19] Batched: true, DataFilters: [], Format: 
Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 

[jira] [Updated] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred

2022-05-09 Thread Alessandro Bellina (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alessandro Bellina updated SPARK-39131:
---
Component/s: SQL

> "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred
> --
>
> Key: SPARK-39131
> URL: https://issues.apache.org/jira/browse/SPARK-39131
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 3.2.1
>Reporter: Alessandro Bellina
>Priority: Major
>
> We would like to propose a slight change in the order of execution of logical 
> plan optimizer rules given a performance issue we have seen with {{LeftSemi}} 
> being materialized too late in the logical plan optimizer, and not benefiting 
> from the null filtering that {{InferFiltersFromConstraints}} can insert.
> I have "something that works" locally (see rest of the description for info 
> and a diff), but given that this is the optimizer it is not clear what else I 
> could be breaking, so I'd like to hear from the experts on whether this is 
> the right change.
> The query in question is based on TPCDS query16 which originally has an 
> {{exists}} filter: 
> {code:sql}
> …
> and exists (select *
> from catalog_sales cs2
> where cs1.cs_order_number = cs2.cs_order_number 
>   and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
> …
> {code}
> The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join 
> like so:
> {code:sql}
> +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], 
> LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869)
>  :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
>  :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
> ENSURE_REQUIREMENTS, [id=#364]
>  : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND 
> isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11))
>  :+- *(1) ColumnarToRow
>  :   +- FileScan parquet [...] Batched: true, DataFilters: 
> [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
> isnotnull(cs_call_center_sk#11)],..., PushedFilters: 
> [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
> IsNotNull(cs_call_center_sk)], ReadSchema: ...
>  +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(cs_order_number#872L, 200), 
> ENSURE_REQUIREMENTS, [id=#372]
>+- *(3) ColumnarToRow
>   +- FileScan parquet [...] Batched: true, DataFilters: [], ..., 
> PushedFilters: [], ReadSchema: ...
> {code}
> Note that the {{LeftSemi}} key and condition are not being filtered out from 
> the stream side, and the build side has not filter at all. We have found that 
> as the dataset size increases, this can become an issue, and in our case, it 
> was many nulls that will not match. We would like to remove the unnecessary 
> rows early at the scan and filter phases.
> The change we made allows the join key and the condition to be added to the 
> stream side filter, and for the build side filter to get added:
> {code:sql}
> +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], 
> LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940)
>:- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
> ENSURE_REQUIREMENTS, [id=#759]
>: +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND 
> isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND 
> isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14))
>:+- *(1) ColumnarToRow
>:   +- FileScan parquet ..., DataFilters: 
> [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
> isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: 
> [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
> IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ...
>+- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(cs_order_number#943L, 200), 
> ENSURE_REQUIREMENTS, [id=#768]
>  +- *(3) Filter (isnotnull(cs_order_number#943L) AND 
> isnotnull(cs_warehouse_sk#940))
> +- *(3) ColumnarToRow
>+- FileScan parquet ..., DataFilters: 
> [isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., 
> PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), 
> IsNotNull(cs_warehouse_sk)], ReadSchema: ... 
> {code}
> This issue can be boiled down to this simple repro:
> {code:java}
> sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else 
> {Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table")
> 

[jira] [Updated] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred

2022-05-09 Thread Alessandro Bellina (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alessandro Bellina updated SPARK-39131:
---
Description: 
We would like to propose a slight change in the order of execution of logical 
plan optimizer rules given a performance issue we have seen with {{LeftSemi}} 
being materialized too late in the logical plan optimizer, and not benefiting 
from the null filtering that {{InferFiltersFromConstraints}} can insert.

I have "something that works" locally (see rest of the description for info and 
a diff), but given that this is the optimizer it is not clear what else I could 
be breaking, so I'd like to hear from the experts on whether this is the right 
change.

The query in question is based on TPCDS query16 which originally has an 
{{exists}} filter: 

{code:sql}
…
and exists (select *
from catalog_sales cs2
where cs1.cs_order_number = cs2.cs_order_number 
  and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
…
{code}

The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join 
like so:

{code:sql}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], LeftSemi, 
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869)
 :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
ENSURE_REQUIREMENTS, [id=#364]
 : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND 
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11))
 :+- *(1) ColumnarToRow
 :   +- FileScan parquet [...] Batched: true, DataFilters: 
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
isnotnull(cs_call_center_sk#11)],..., PushedFilters: 
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
IsNotNull(cs_call_center_sk)], ReadSchema: ...
 +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cs_order_number#872L, 200), 
ENSURE_REQUIREMENTS, [id=#372]
   +- *(3) ColumnarToRow
  +- FileScan parquet [...] Batched: true, DataFilters: [], ..., 
PushedFilters: [], ReadSchema: ...
{code}

Note that the {{LeftSemi}} key and condition are not being filtered out from 
the stream side, and the build side has not filter at all. We have found that 
as the dataset size increases, this can become an issue, and in our case, it 
was many nulls that will not match. We would like to remove the unnecessary 
rows early at the scan and filter phases.

The change we made allows the join key and the condition to be added to the 
stream side filter, and for the build side filter to get added:

{code:sql}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], LeftSemi, 
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940)
   :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
ENSURE_REQUIREMENTS, [id=#759]
   : +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND 
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND 
isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14))
   :+- *(1) ColumnarToRow
   :   +- FileScan parquet ..., DataFilters: 
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: 
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ...
   +- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(cs_order_number#943L, 200), 
ENSURE_REQUIREMENTS, [id=#768]
 +- *(3) Filter (isnotnull(cs_order_number#943L) AND 
isnotnull(cs_warehouse_sk#940))
+- *(3) ColumnarToRow
   +- FileScan parquet ..., DataFilters: 
[isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., 
PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), 
IsNotNull(cs_warehouse_sk)], ReadSchema: ... 
{code}

This issue can be boiled down to this simple repro:

{code:java}
sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else 
{Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table")
spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table")
spark.sql("select * from my_table t1 where exists(select * from my_table  t2 
where t2.value = t1.value)").explain(true)
{code}

Which produces a similar plan, with a {{LeftSemi}} and no filters:

{code:sql}
== Physical Plan ==
*(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false
:- *(2) ColumnarToRow
:  +- FileScan parquet [value#19] Batched: true, DataFilters: [], Format: 
Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 

[jira] [Created] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred

2022-05-09 Thread Alessandro Bellina (Jira)
Alessandro Bellina created SPARK-39131:
--

 Summary: "Exists" is optimized too late (to LeftSemi) preventing 
filters to be inferred
 Key: SPARK-39131
 URL: https://issues.apache.org/jira/browse/SPARK-39131
 Project: Spark
  Issue Type: Bug
  Components: Optimizer
Affects Versions: 3.2.1
Reporter: Alessandro Bellina


We would like to propose a slight change in the order of execution of logical 
plan optimizer rules given a performance issue we have seen with {{LeftSemi}} 
being materialized too late in the logical plan optimizer, and not benefiting 
from the null filtering that {{InferFiltersFromConstraints}} can insert.

I have "something that works" locally (see rest of the description for info and 
a diff), but given that this is the optimizer it is not clear what else I could 
be breaking, so I'd like to hear from the experts on whether this is the right 
change.

The query in question is based on TPCDS query16 which originally has an 
{{exists}} filter: 

{code:sql}
…
and exists (select *
from catalog_sales cs2
where cs1.cs_order_number = cs2.cs_order_number 
  and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
…
{code}

The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join 
like so:

{code:sql}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], LeftSemi, 
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869)
 :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
 :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
ENSURE_REQUIREMENTS, [id=#364]
 : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND 
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11))
 :+- *(1) ColumnarToRow
 :   +- FileScan parquet [...] Batched: true, DataFilters: 
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
isnotnull(cs_call_center_sk#11)],..., PushedFilters: 
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
IsNotNull(cs_call_center_sk)], ReadSchema: ...
 +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cs_order_number#872L, 200), 
ENSURE_REQUIREMENTS, [id=#372]
   +- *(3) ColumnarToRow
  +- FileScan parquet [...] Batched: true, DataFilters: [], ..., 
PushedFilters: [], ReadSchema: ...
{code}

Note that the {{LeftSemi}} key and condition are not being filtered out from 
the stream side, and the build side has not filter at all. We have found that 
as the dataset size increases, this can become an issue, and in our case, it 
was many nulls that will not match. We would like to remove the unnecessary 
rows early at the scan and filter phases.

The change we made allows the join key and the condition to be added to the 
stream side filter, and for the build side filter to get added:

{code:java}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], LeftSemi, 
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940)
   :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
ENSURE_REQUIREMENTS, [id=#759]
   : +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND 
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND 
isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14))
   :+- *(1) ColumnarToRow
   :   +- FileScan parquet ..., DataFilters: 
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: 
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ...
   +- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(cs_order_number#943L, 200), 
ENSURE_REQUIREMENTS, [id=#768]
 +- *(3) Filter (isnotnull(cs_order_number#943L) AND 
isnotnull(cs_warehouse_sk#940))
+- *(3) ColumnarToRow
   +- FileScan parquet ..., DataFilters: 
[isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., 
PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), 
IsNotNull(cs_warehouse_sk)], ReadSchema: ... 
{code}

This issue can be boiled down to this simple repro:

{code:java}
sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else 
{Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table")
spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table")
spark.sql("select * from my_table t1 where exists(select * from my_table  t2 
where t2.value = t1.value)").explain(true)
{code}

Which produces a similar plan, with a {{LeftSemi}} and no filters:

{code:java}
== Physical Plan ==
*(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false
:- *(2) ColumnarToRow
:  +- FileScan parquet [value#19] Batched: true, 

[jira] [Created] (SPARK-29780) The UI can access into the ResourceAllocator, whose data structures are being updated from scheduler threads

2019-11-06 Thread Alessandro Bellina (Jira)
Alessandro Bellina created SPARK-29780:
--

 Summary: The UI can access into the ResourceAllocator, whose data 
structures are being updated from scheduler threads
 Key: SPARK-29780
 URL: https://issues.apache.org/jira/browse/SPARK-29780
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Alessandro Bellina


A class extending ResourceAllocator (WorkerResourceInfo), has some potential 
issues (raised here: 
https://github.com/apache/spark/pull/26078#discussion_r340342820)

The WorkerInfo class is calling into availableAddrs and assignedAddrs but those 
calls appear to be coming from the UI (looking at the resourcesInfo* 
functions), e.g. JsonProtocol and MasterPage call this. Since the 
datastructures in ResourceAllocator are not concurrent, we could end up with 
bad data or potentially crashes, depending on when the calls are made. 

Note that there are other calls to the resourceInfo* functions, but those are 
from the event loop.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28679) Spark Yarn ResourceRequestHelper shouldn't lookup setResourceInformation is no resources specified

2019-08-09 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16904004#comment-16904004
 ] 

Alessandro Bellina commented on SPARK-28679:


Working on it.

> Spark Yarn ResourceRequestHelper shouldn't lookup setResourceInformation is 
> no resources specified
> --
>
> Key: SPARK-28679
> URL: https://issues.apache.org/jira/browse/SPARK-28679
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Thomas Graves
>Priority: Minor
>
> in the Spark Yarn ResourceRequestHelper it uses reflection to lookup 
> setResourceInformation. We should skip that lookup if the resource Map is 
> empty.
> [https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala#L154]
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27558) NPE in TaskCompletionListener due to Spark OOM in UnsafeExternalSorter causing tasks to hang

2019-04-24 Thread Alessandro Bellina (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alessandro Bellina updated SPARK-27558:
---
Description: 
We see an NPE in the UnsafeInMemorySorter.getMemoryUsage function (due to the 
array we are accessing there being null). This looks to be caused by a Spark 
OOM when UnsafeInMemorySorter is trying to spill.

This is likely a symptom of https://issues.apache.org/jira/browse/SPARK-21492. 
The real question for this ticket is, could we handle things more gracefully, 
rather than NPE. For example:

Remove this:

https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java#L182

so when this fails (and store the new array into a temporary):

https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java#L186

we don't end up with a null "array". This state is causing one of our jobs to 
hang infinitely (we think) due to the original allocation error.

Stack trace for reference

{noformat}
2019-04-23 08:57:14,989 [Executor task launch worker for task 46729] ERROR 
org.apache.spark.TaskContextImpl  - Error in TaskCompletionListener
java.lang.NullPointerException
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getMemoryUsage(UnsafeInMemorySorter.java:208)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getMemoryUsage(UnsafeExternalSorter.java:249)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.updatePeakMemoryUsed(UnsafeExternalSorter.java:253)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:296)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:328)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.lambda$new$0(UnsafeExternalSorter.java:178)
at 
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118)
at 
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118)
at 
org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:131)
at 
org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:129)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:129)
at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117)
at org.apache.spark.scheduler.Task.run(Task.scala:119)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


2019-04-23 08:57:15,069 [Executor task launch worker for task 46729] ERROR 
org.apache.spark.executor.Executor  - Exception in task 102.0 in stage 28.0 
(TID 46729)
org.apache.spark.util.TaskCompletionListenerException: null

Previous exception in task: Unable to acquire 65536 bytes of memory, got 0
org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)

org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)

org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:186)

org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:229)

org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:204)

org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:283)

org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:96)

org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:348)

org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:403)

org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135)

org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage23.sort_addToSorter_0$(Unknown
 Source)

org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage23.processNext(Unknown
 Source)

org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)


[jira] [Created] (SPARK-27558) NPE in TaskCompletionListener due to Spark OOM in UnsafeExternalSorter causing tasks to hang

2019-04-24 Thread Alessandro Bellina (JIRA)
Alessandro Bellina created SPARK-27558:
--

 Summary: NPE in TaskCompletionListener due to Spark OOM in 
UnsafeExternalSorter causing tasks to hang
 Key: SPARK-27558
 URL: https://issues.apache.org/jira/browse/SPARK-27558
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.2, 2.3.3
Reporter: Alessandro Bellina


We see an NPE in the UnsafeInMemorySorter.getMemoryUsage function (due to the 
array we are accessing there being null). This looks to be caused by a Spark 
OOM when UnsafeInMemorySorter is trying to spill.

This is likely a symptom of https://issues.apache.org/jira/browse/SPARK-21492. 
The real question for this ticket is, could we handle things more gracefully, 
rather than NPE. For example:

allocate the new array into a temporary here:

https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java#L182

so when this fails:

https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java#L186

we don't end up with a null "array". This state is causing one of our jobs to 
hang infinitely (we think) due to the original allocation error.

Stack trace for reference

{noformat}
2019-04-23 08:57:14,989 [Executor task launch worker for task 46729] ERROR 
org.apache.spark.TaskContextImpl  - Error in TaskCompletionListener
java.lang.NullPointerException
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getMemoryUsage(UnsafeInMemorySorter.java:208)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getMemoryUsage(UnsafeExternalSorter.java:249)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.updatePeakMemoryUsed(UnsafeExternalSorter.java:253)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:296)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:328)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.lambda$new$0(UnsafeExternalSorter.java:178)
at 
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118)
at 
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118)
at 
org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:131)
at 
org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:129)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:129)
at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117)
at org.apache.spark.scheduler.Task.run(Task.scala:119)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


2019-04-23 08:57:15,069 [Executor task launch worker for task 46729] ERROR 
org.apache.spark.executor.Executor  - Exception in task 102.0 in stage 28.0 
(TID 46729)
org.apache.spark.util.TaskCompletionListenerException: null

Previous exception in task: Unable to acquire 65536 bytes of memory, got 0
org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)

org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)

org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:186)

org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:229)

org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:204)

org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:283)

org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:96)

org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:348)

org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:403)

org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135)

org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage23.sort_addToSorter_0$(Unknown
 Source)


[jira] [Commented] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins

2019-03-05 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16784853#comment-16784853
 ] 

Alessandro Bellina commented on SPARK-26944:


[~shaneknapp] nice!! thank you

> Python unit-tests.log not available in artifacts for a build in Jenkins
> ---
>
> Key: SPARK-26944
> URL: https://issues.apache.org/jira/browse/SPARK-26944
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Alessandro Bellina
>Assignee: shane knapp
>Priority: Minor
> Attachments: Screen Shot 2019-03-05 at 12.08.43 PM.png
>
>
> I had a pr where the python unit tests failed.  The tests point at the 
> `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, 
> but I can't get to that from jenkins UI it seems (are all prs writing to the 
> same file?).
> {code:java}
> 
> Running PySpark tests
> 
> Running PySpark tests. Output is in 
> /home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code}
> For reference, please see this build: 
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console
> This Jira is to make it available under the artifacts for each build.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26809) insert overwrite directory + concat function => error

2019-02-23 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775908#comment-16775908
 ] 

Alessandro Bellina commented on SPARK-26809:


If you provide an alias for the concatenated columns, it works:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat("foo", "bar") as foo_bar
{noformat}


> insert overwrite directory + concat function => error
> -
>
> Key: SPARK-26809
> URL: https://issues.apache.org/jira/browse/SPARK-26809
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: ant_nebula
>Priority: Critical
>
> insert overwrite directory '/tmp/xx'
> select concat(col1, col2)
> from tableXX
> limit 3
>  
> Caused by: org.apache.hadoop.hive.serde2.SerDeException: 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 3 elements 
> while columns.types has 2 elements!
>  at 
> org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
>  at 
> org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85)
>  at 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
>  at 
> org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:119)
>  at 
> org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:103)
>  at 
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
>  at 
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:108)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:233)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>  at org.apache.spark.scheduler.Task.run(Task.scala:121)
>  at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26809) insert overwrite directory + concat function => error

2019-02-22 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775772#comment-16775772
 ] 

Alessandro Bellina edited comment on SPARK-26809 at 2/23/19 3:25 AM:
-

This does it. Didn't need the limit to reproduce:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat(col1, col2) 
from ((select "foo" as col1, "bar" as col2));
{noformat}

This also triggers it:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat("foo", "bar") 
{noformat}

{noformat}
Caused by: org.apache.hadoop.hive.serde2.SerDeException: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements 
while columns.types has 1 elements!
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
at 
org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121)
at 
org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{noformat}



was (Author: abellina):
This does it. Didn't need the limit to reproduce:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat(col1, col2) 
{noformat}

This also triggers it:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat(col1, col2) 
from ((select "foo" as col1, "bar" as col2));
{noformat}

{noformat}
Caused by: org.apache.hadoop.hive.serde2.SerDeException: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements 
while columns.types has 1 elements!
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
at 
org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121)
at 
org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{noformat}


> insert overwrite directory + concat function => error
> -
>
> Key: SPARK-26809
> URL: https://issues.apache.org/jira/browse/SPARK-26809
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: ant_nebula
>Priority: Critical
>
> insert overwrite 

[jira] [Comment Edited] (SPARK-26809) insert overwrite directory + concat function => error

2019-02-22 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775772#comment-16775772
 ] 

Alessandro Bellina edited comment on SPARK-26809 at 2/23/19 3:25 AM:
-

This does it. Didn't need the limit to reproduce:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat(col1, col2) 
{noformat}

This also triggers it:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat(col1, col2) 
from ((select "foo" as col1, "bar" as col2));
{noformat}

{noformat}
Caused by: org.apache.hadoop.hive.serde2.SerDeException: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements 
while columns.types has 1 elements!
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
at 
org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121)
at 
org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{noformat}



was (Author: abellina):
This does it. Didn't need the limit to reproduce:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat(col1, col2) 
from ((select "foo" as col1, "bar" as col2));
{noformat}

This also triggers it:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat(col1, col2) 
from ((select "foo" as col1, "bar" as col2));
{noformat}

{noformat}
Caused by: org.apache.hadoop.hive.serde2.SerDeException: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements 
while columns.types has 1 elements!
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
at 
org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121)
at 
org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{noformat}


> insert overwrite directory + concat function => error
> -
>
> Key: SPARK-26809
> URL: https://issues.apache.org/jira/browse/SPARK-26809
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: ant_nebula
>

[jira] [Comment Edited] (SPARK-26809) insert overwrite directory + concat function => error

2019-02-22 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775772#comment-16775772
 ] 

Alessandro Bellina edited comment on SPARK-26809 at 2/23/19 3:25 AM:
-

This does it. Didn't need the limit to reproduce:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat(col1, col2) 
from ((select "foo" as col1, "bar" as col2));
{noformat}

This also triggers it:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat(col1, col2) 
from ((select "foo" as col1, "bar" as col2));
{noformat}

{noformat}
Caused by: org.apache.hadoop.hive.serde2.SerDeException: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements 
while columns.types has 1 elements!
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
at 
org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121)
at 
org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{noformat}



was (Author: abellina):
This does it. Didn't need the limit to reproduce:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat(col1, col2) 
from ((select "foo" as col1, "bar" as col2));
{noformat}

{noformat}
Caused by: org.apache.hadoop.hive.serde2.SerDeException: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements 
while columns.types has 1 elements!
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
at 
org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121)
at 
org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{noformat}


> insert overwrite directory + concat function => error
> -
>
> Key: SPARK-26809
> URL: https://issues.apache.org/jira/browse/SPARK-26809
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: ant_nebula
>Priority: Critical
>
> insert overwrite directory '/tmp/xx'
> select concat(col1, col2)
> from tableXX
> limit 3
>  
> 

[jira] [Commented] (SPARK-26809) insert overwrite directory + concat function => error

2019-02-22 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775772#comment-16775772
 ] 

Alessandro Bellina commented on SPARK-26809:


This does it. Didn't need the limit to reproduce:

{noformat}
insert overwrite directory '/tmp/SPARK-26809' 
select concat(col1, col2) 
from ((select "foo" as col1, "bar" as col2));
{noformat}

{noformat}
Caused by: org.apache.hadoop.hive.serde2.SerDeException: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements 
while columns.types has 1 elements!
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
at 
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85)
at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
at 
org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121)
at 
org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{noformat}


> insert overwrite directory + concat function => error
> -
>
> Key: SPARK-26809
> URL: https://issues.apache.org/jira/browse/SPARK-26809
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: ant_nebula
>Priority: Critical
>
> insert overwrite directory '/tmp/xx'
> select concat(col1, col2)
> from tableXX
> limit 3
>  
> Caused by: org.apache.hadoop.hive.serde2.SerDeException: 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 3 elements 
> while columns.types has 2 elements!
>  at 
> org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
>  at 
> org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85)
>  at 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
>  at 
> org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:119)
>  at 
> org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:103)
>  at 
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
>  at 
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:108)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:233)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>  at org.apache.spark.scheduler.Task.run(Task.scala:121)
>  at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26945) Python streaming tests flaky while cleaning temp directories after StreamingQuery.stop

2019-02-22 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775125#comment-16775125
 ] 

Alessandro Bellina commented on SPARK-26945:


[~hyukjin.kwon] thanks for taking a look. Seems like q.processAllAvailable is 
designed for this use case.

> Python streaming tests flaky while cleaning temp directories after 
> StreamingQuery.stop
> --
>
> Key: SPARK-26945
> URL: https://issues.apache.org/jira/browse/SPARK-26945
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.0
>Reporter: Alessandro Bellina
>Priority: Minor
>
> From the test code, it seems like the `shmutil.rmtree` function is trying to 
> delete a directory, but there's likely another thread adding entries to a 
> directory, so when it gets to `os.rmdir(path)` it blows up. I think the test 
> (and other streaming tests) should call `q.awaitTermination` after `q.stop`, 
> before going on. I'll file a separate jira.
> {noformat}
> ERROR: test_query_manager_await_termination 
> (pyspark.sql.tests.test_streaming.StreamingTests)
> --
> Traceback (most recent call last):
>  File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/tests/test_streaming.py",
>  line 259, in test_query_manager_await_termination
>  shutil.rmtree(tmpPath)
>  File "/home/anaconda/lib/python2.7/shutil.py", line 256, in rmtree
>  onerror(os.rmdir, path, sys.exc_info())
>  File "/home/anaconda/lib/python2.7/shutil.py", line 254, in rmtree
>  os.rmdir(path)
> OSError: [Errno 39] Directory not empty: 
> '/home/jenkins/workspace/SparkPullRequestBuilder/python/target/072153bd-f981-47be-bda2-e2b657a16f65/tmp4WGp7n'{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins

2019-02-22 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775117#comment-16775117
 ] 

Alessandro Bellina edited comment on SPARK-26944 at 2/22/19 1:07 PM:
-

Hmm, I have a subsequent build from the same PR, and I don't see a link to the 
python tests either. Maybe I am looking in the wrong place? 

[https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102590/artifact/]


was (Author: abellina):
Hmm, I have a subsequent build from the same PR, and I don't see a link to the 
python tests either. Maybe I am looking in the wrong place?

 

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102590/artifact/

> Python unit-tests.log not available in artifacts for a build in Jenkins
> ---
>
> Key: SPARK-26944
> URL: https://issues.apache.org/jira/browse/SPARK-26944
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Alessandro Bellina
>Priority: Minor
>
> I had a pr where the python unit tests failed.  The tests point at the 
> `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, 
> but I can't get to that from jenkins UI it seems (are all prs writing to the 
> same file?).
> {code:java}
> 
> Running PySpark tests
> 
> Running PySpark tests. Output is in 
> /home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code}
> For reference, please see this build: 
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console
> This Jira is to make it available under the artifacts for each build.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins

2019-02-22 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775117#comment-16775117
 ] 

Alessandro Bellina commented on SPARK-26944:


Hmm, I have a subsequent build from the same PR, and I don't see a link to the 
python tests either. Maybe I am looking in the wrong place?

 

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102590/artifact/

> Python unit-tests.log not available in artifacts for a build in Jenkins
> ---
>
> Key: SPARK-26944
> URL: https://issues.apache.org/jira/browse/SPARK-26944
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Alessandro Bellina
>Priority: Minor
>
> I had a pr where the python unit tests failed.  The tests point at the 
> `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, 
> but I can't get to that from jenkins UI it seems (are all prs writing to the 
> same file?).
> {code:java}
> 
> Running PySpark tests
> 
> Running PySpark tests. Output is in 
> /home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code}
> For reference, please see this build: 
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console
> This Jira is to make it available under the artifacts for each build.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins

2019-02-21 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16774800#comment-16774800
 ] 

Alessandro Bellina commented on SPARK-26944:


[~shivuson...@gmail.com] [~hyukjin.kwon] I added a link to the description. I 
think we should look at the Jenkins configuration to see what is getting copied 
to artifacts, though I believe you need special access for that.

> Python unit-tests.log not available in artifacts for a build in Jenkins
> ---
>
> Key: SPARK-26944
> URL: https://issues.apache.org/jira/browse/SPARK-26944
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Alessandro Bellina
>Priority: Minor
>
> I had a pr where the python unit tests failed.  The tests point at the 
> `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, 
> but I can't get to that from jenkins UI it seems (are all prs writing to the 
> same file?).
> {code:java}
> 
> Running PySpark tests
> 
> Running PySpark tests. Output is in 
> /home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code}
> For reference, please see this build: 
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console
> This Jira is to make it available under the artifacts for each build.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins

2019-02-21 Thread Alessandro Bellina (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alessandro Bellina updated SPARK-26944:
---
Description: 
I had a pr where the python unit tests failed.  The tests point at the 
`/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, 
but I can't get to that from jenkins UI it seems (are all prs writing to the 
same file?).
{code:java}

Running PySpark tests

Running PySpark tests. Output is in 
/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code}
For reference, please see this build: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console

This Jira is to make it available under the artifacts for each build.

  was:
I had a pr where the python unit tests failed.  The tests point at the 
`/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, 
but I can't get to that from jenkins UI it seems (are all prs writing to the 
same file?).

This Jira is to make it available under the artifacts for each build.


> Python unit-tests.log not available in artifacts for a build in Jenkins
> ---
>
> Key: SPARK-26944
> URL: https://issues.apache.org/jira/browse/SPARK-26944
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.4.0
>Reporter: Alessandro Bellina
>Priority: Minor
>
> I had a pr where the python unit tests failed.  The tests point at the 
> `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, 
> but I can't get to that from jenkins UI it seems (are all prs writing to the 
> same file?).
> {code:java}
> 
> Running PySpark tests
> 
> Running PySpark tests. Output is in 
> /home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code}
> For reference, please see this build: 
> https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console
> This Jira is to make it available under the artifacts for each build.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26945) Python streaming tests flaky while cleaning temp directories after StreamingQuery.stop

2019-02-20 Thread Alessandro Bellina (JIRA)
Alessandro Bellina created SPARK-26945:
--

 Summary: Python streaming tests flaky while cleaning temp 
directories after StreamingQuery.stop
 Key: SPARK-26945
 URL: https://issues.apache.org/jira/browse/SPARK-26945
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.4.0
Reporter: Alessandro Bellina


>From the test code, it seems like the `shmutil.rmtree` function is trying to 
>delete a directory, but there's likely another thread adding entries to a 
>directory, so when it gets to `os.rmdir(path)` it blows up. I think the test 
>(and other streaming tests) should call `q.awaitTermination` after `q.stop`, 
>before going on. I'll file a separate jira.
{noformat}
ERROR: test_query_manager_await_termination 
(pyspark.sql.tests.test_streaming.StreamingTests)
--
Traceback (most recent call last):
 File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/tests/test_streaming.py",
 line 259, in test_query_manager_await_termination
 shutil.rmtree(tmpPath)
 File "/home/anaconda/lib/python2.7/shutil.py", line 256, in rmtree
 onerror(os.rmdir, path, sys.exc_info())
 File "/home/anaconda/lib/python2.7/shutil.py", line 254, in rmtree
 os.rmdir(path)
OSError: [Errno 39] Directory not empty: 
'/home/jenkins/workspace/SparkPullRequestBuilder/python/target/072153bd-f981-47be-bda2-e2b657a16f65/tmp4WGp7n'{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins

2019-02-20 Thread Alessandro Bellina (JIRA)
Alessandro Bellina created SPARK-26944:
--

 Summary: Python unit-tests.log not available in artifacts for a 
build in Jenkins
 Key: SPARK-26944
 URL: https://issues.apache.org/jira/browse/SPARK-26944
 Project: Spark
  Issue Type: Improvement
  Components: Build
Affects Versions: 2.4.0
Reporter: Alessandro Bellina


I had a pr where the python unit tests failed.  The tests point at the 
`/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, 
but I can't get to that from jenkins UI it seems (are all prs writing to the 
same file?).

This Jira is to make it available under the artifacts for each build.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26895) When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to resolve globs owned by target user

2019-02-15 Thread Alessandro Bellina (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alessandro Bellina updated SPARK-26895:
---
Summary: When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit 
fails to resolve globs owned by target user  (was: When running spark 2.3 as a 
proxy user (--proxy-user), SparkSubmit fails to resolve globs owned by target 
uer)

> When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to 
> resolve globs owned by target user
> --
>
> Key: SPARK-26895
> URL: https://issues.apache.org/jira/browse/SPARK-26895
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Alessandro Bellina
>Priority: Critical
>
> We are resolving globs in SparkSubmit here (by way of 
> prepareSubmitEnvironment) without first going into a doAs:
> https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L143
> Without first entering a doAs, as done here:
> [https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L151]
> So when running spark-submit with --proxy-user, and for example --archives, 
> it will fail to launch unless the location of the archive is open to the user 
> that executed spark-submit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26895) When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to resolve globs owned by target uer

2019-02-15 Thread Alessandro Bellina (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alessandro Bellina updated SPARK-26895:
---
Summary: When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit 
fails to resolve globs owned by target uer  (was: When running spark 2.3 as a 
proxy user (--proxy-user), SparkSubmit fails to resolve globs)

> When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to 
> resolve globs owned by target uer
> -
>
> Key: SPARK-26895
> URL: https://issues.apache.org/jira/browse/SPARK-26895
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Alessandro Bellina
>Priority: Critical
>
> We are resolving globs in SparkSubmit here (by way of 
> prepareSubmitEnvironment) without first going into a doAs:
> https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L143
> Without first entering a doAs, as done here:
> [https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L151]
> So when running spark-submit with --proxy-user, and for example --archives, 
> it will fail to launch unless the location of the archive is open to the user 
> that executed spark-submit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26895) When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to resolve globs

2019-02-15 Thread Alessandro Bellina (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alessandro Bellina updated SPARK-26895:
---
Summary: When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit 
fails to resolve globs  (was: When running spark 2.3 as a proxy user 
--proxy-user, SparkSubmit fails to resolve globs)

> When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to 
> resolve globs
> -
>
> Key: SPARK-26895
> URL: https://issues.apache.org/jira/browse/SPARK-26895
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Alessandro Bellina
>Priority: Critical
>
> We are resolving globs in SparkSubmit here (by way of 
> prepareSubmitEnvironment) without first going into a doAs:
> https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L143
> Without first entering a doAs, as done here:
> [https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L151]
> So when running spark-submit with --proxy-user, and for example --archives, 
> it will fail to launch unless the location of the archive is open to the user 
> that executed spark-submit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26895) When running spark 2.3 as a proxy user --proxy-user, SparkSubmit fails to resolve globs

2019-02-15 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16769666#comment-16769666
 ] 

Alessandro Bellina commented on SPARK-26895:


I am working on this.

> When running spark 2.3 as a proxy user --proxy-user, SparkSubmit fails to 
> resolve globs
> ---
>
> Key: SPARK-26895
> URL: https://issues.apache.org/jira/browse/SPARK-26895
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2, 2.4.0
>Reporter: Alessandro Bellina
>Priority: Critical
>
> We are resolving globs in SparkSubmit here (by way of 
> prepareSubmitEnvironment) without first going into a doAs:
> https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L143
> Without first entering a doAs, as done here:
> [https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L151]
> So when running spark-submit with --proxy-user, and for example --archives, 
> it will fail to launch unless the location of the archive is open to the user 
> that executed spark-submit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26895) When running spark 2.3 as a proxy user --proxy-user, SparkSubmit fails to resolve globs

2019-02-15 Thread Alessandro Bellina (JIRA)
Alessandro Bellina created SPARK-26895:
--

 Summary: When running spark 2.3 as a proxy user --proxy-user, 
SparkSubmit fails to resolve globs
 Key: SPARK-26895
 URL: https://issues.apache.org/jira/browse/SPARK-26895
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0, 2.3.2
Reporter: Alessandro Bellina


We are resolving globs in SparkSubmit here (by way of prepareSubmitEnvironment) 
without first going into a doAs:

https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L143

Without first entering a doAs, as done here:

[https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L151]

So when running spark-submit with --proxy-user, and for example --archives, it 
will fail to launch unless the location of the archive is open to the user that 
executed spark-submit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26257) SPIP: Interop Support for Spark Language Extensions

2019-01-14 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16742519#comment-16742519
 ] 

Alessandro Bellina commented on SPARK-26257:


[~tcondie] I think you should share the design doc here too. It's a good idea, 
just curious on the details of what's being proposed.

> SPIP: Interop Support for Spark Language Extensions
> ---
>
> Key: SPARK-26257
> URL: https://issues.apache.org/jira/browse/SPARK-26257
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, R, Spark Core
>Affects Versions: 2.4.0
>Reporter: Tyson Condie
>Priority: Minor
>
> h2.  ** Background and Motivation:
> There is a desire for third party language extensions for Apache Spark. Some 
> notable examples include:
>  * C#/F# from project Mobius [https://github.com/Microsoft/Mobius]
>  * Haskell from project sparkle [https://github.com/tweag/sparkle]
>  * Julia from project Spark.jl [https://github.com/dfdx/Spark.jl]
> Presently, Apache Spark supports Python and R via a tightly integrated 
> interop layer. It would seem that much of that existing interop layer could 
> be refactored into a clean surface for general (third party) language 
> bindings, such as the above mentioned. More specifically, could we generalize 
> the following modules:
>  * Deploy runners (e.g., PythonRunner and RRunner)
>  * DataFrame Executors
>  * RDD operations?
> The last being questionable: integrating third party language extensions at 
> the RDD level may be too heavy-weight and unnecessary given the preference 
> towards the Dataframe abstraction.
> The main goals of this effort would be:
>  * Provide a clean abstraction for third party language extensions making it 
> easier to maintain (the language extension) with the evolution of Apache Spark
>  * Provide guidance to third party language authors on how a language 
> extension should be implemented
>  * Provide general reusable libraries that are not specific to any language 
> extension
>  * Open the door to developers that prefer alternative languages
>  * Identify and clean up common code shared between Python and R interops
> h2. Target Personas:
> Data Scientists, Data Engineers, Library Developers
> h2. Goals:
> Data scientists and engineers will have the opportunity to work with Spark in 
> languages other than what’s natively supported. Library developers will be 
> able to create language extensions for Spark in a clean way. The interop 
> layer should also provide guidance for developing language extensions.
> h2. Non-Goals:
> The proposal does not aim to create an actual language extension. Rather, it 
> aims to provide a stable interop layer for third party language extensions to 
> dock.
> h2. Proposed API Changes:
> Much of the work will involve generalizing existing interop APIs for PySpark 
> and R, specifically for the Dataframe API. For instance, it would be good to 
> have a general deploy.Runner (similar to PythonRunner) for language extension 
> efforts. In Spark SQL, it would be good to have a general InteropUDF and 
> evaluator (similar to BatchEvalPythonExec).
> Low-level RDD operations should not be needed in this initial offering; 
> depending on the success of the interop layer and with proper demand, RDD 
> interop could be added later. However, one open question is supporting a 
> subset of low-level functions that are core to ETL e.g., transform.
> h2. Optional Design Sketch:
> The work would be broken down into two top-level phases:
>  Phase 1: Introduce general interop API for deploying a driver/application, 
> running an interop UDF along with any other low-level transformations that 
> aid with ETL.
> Phase 2: Port existing Python and R language extensions to the new interop 
> layer. This port should be contained solely to the Spark core side, and all 
> protocols specific to Python and R should not change e.g., Python should 
> continue to use py4j is the protocol between the Python process and core 
> Spark. The port itself should be contained to a handful of files e.g., some 
> examples for Python: PythonRunner, BatchEvalPythonExec, +PythonUDFRunner+, 
> PythonRDD (possibly), and will mostly involve refactoring common logic 
> abstract implementations and utilities.
> h2. Optional Rejected Designs:
> The clear alternative is the status quo; developers that want to provide a 
> third-party language extension to Spark do so directly; often by extending 
> existing Python classes and overriding the portions that are relevant to the 
> new extension. Not only is this not sound code (e.g., an JuliaRDD is not a 
> PythonRDD, which contains a lot of reusable code), but it runs the great risk 
> of future revisions making the subclass implementation obsolete. It would be 
> hard to imagine that any third-party 

[jira] [Commented] (SPARK-26285) Add a metric source for accumulators (aka AccumulatorSource)

2018-12-05 Thread Alessandro Bellina (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710970#comment-16710970
 ] 

Alessandro Bellina commented on SPARK-26285:


I can't assign this issue, but I am putting up a PR for it.

> Add a metric source for accumulators (aka AccumulatorSource)
> 
>
> Key: SPARK-26285
> URL: https://issues.apache.org/jira/browse/SPARK-26285
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Alessandro Bellina
>Priority: Minor
>
> We'd like a simple mechanism to register spark accumulators against the 
> codahale metrics registry. 
> This task proposes adding a LongAccumulatorSource and a 
> DoubleAccumulatorSource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26285) Add a metric source for accumulators (aka AccumulatorSource)

2018-12-05 Thread Alessandro Bellina (JIRA)
Alessandro Bellina created SPARK-26285:
--

 Summary: Add a metric source for accumulators (aka 
AccumulatorSource)
 Key: SPARK-26285
 URL: https://issues.apache.org/jira/browse/SPARK-26285
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Alessandro Bellina


We'd like a simple mechanism to register spark accumulators against the 
codahale metrics registry. 

This task proposes adding a LongAccumulatorSource and a DoubleAccumulatorSource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25043) spark-sql should print the appId and master on startup

2018-08-07 Thread Alessandro Bellina (JIRA)
Alessandro Bellina created SPARK-25043:
--

 Summary: spark-sql should print the appId and master on startup
 Key: SPARK-25043
 URL: https://issues.apache.org/jira/browse/SPARK-25043
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.1
Reporter: Alessandro Bellina


In spark-sql, if logging is turned down all the way, it's not possible to find 
out what appId is running at the moment. This small change as a print to stdout 
containing the master type and the appId to have that on screen.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org