[jira] [Commented] (SPARK-45792) SPIP: ShuffleManager short name registration via SparkPlugin
[ https://issues.apache.org/jira/browse/SPARK-45792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17782931#comment-17782931 ] Alessandro Bellina commented on SPARK-45792: Added a link to an issue that needs to be solved in order for this SPIP to move forward. > SPIP: ShuffleManager short name registration via SparkPlugin > > > Key: SPARK-45792 > URL: https://issues.apache.org/jira/browse/SPARK-45792 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 4.0.0 >Reporter: Alessandro Bellina >Priority: Minor > Fix For: 4.0.0 > > > We would like to make it possible for a SparkPlugin to expose custom > ShuffleManager implementations using short names, in addition to default > configurations for each short name in order to improve ease of use. > Today, users leveraging a spark plugin need to set a class name under > spark.plugins and if the plugin also provides a ShuffleManager, the user > needs to set spark.shuffle.manager to a fully qualified class name. > Additionally, users need to make sure they have set Spark configurations that > are required or recommended for the shuffle manager instance. This can be > cumbersome for the user, adding barriers to the pluggable interface that > Spark provides. > Spark provides a short name for SortShuffleManager (“sort”) today. The idea > in this SPIP is to make this set of short names extensible by a SparkPlugin. > SPIP: > https://docs.google.com/document/d/1flijDjMMAAGh2C2k-vg1u651RItaRquLGB_sVudxf6I/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45792) SPIP: ShuffleManager short name registration via SparkPlugin
Alessandro Bellina created SPARK-45792: -- Summary: SPIP: ShuffleManager short name registration via SparkPlugin Key: SPARK-45792 URL: https://issues.apache.org/jira/browse/SPARK-45792 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 4.0.0 Reporter: Alessandro Bellina Fix For: 4.0.0 We would like to make it possible for a SparkPlugin to expose custom ShuffleManager implementations using short names, in addition to default configurations for each short name in order to improve ease of use. Today, users leveraging a spark plugin need to set a class name under spark.plugins and if the plugin also provides a ShuffleManager, the user needs to set spark.shuffle.manager to a fully qualified class name. Additionally, users need to make sure they have set Spark configurations that are required or recommended for the shuffle manager instance. This can be cumbersome for the user, adding barriers to the pluggable interface that Spark provides. Spark provides a short name for SortShuffleManager (“sort”) today. The idea in this SPIP is to make this set of short names extensible by a SparkPlugin. SPIP: https://docs.google.com/document/d/1flijDjMMAAGh2C2k-vg1u651RItaRquLGB_sVudxf6I/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45762) Shuffle managers defined in user jars are not available for some launch modes
Alessandro Bellina created SPARK-45762: -- Summary: Shuffle managers defined in user jars are not available for some launch modes Key: SPARK-45762 URL: https://issues.apache.org/jira/browse/SPARK-45762 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.5.0 Reporter: Alessandro Bellina Fix For: 4.0.0 Starting a spark job in standalone mode with a custom `ShuffleManager` provided in a jar via `--jars` does not work. This can also be experienced in local-cluster mode. The approach that works consistently is to copy the jar containing the custom `ShuffleManager` to a specific location in each node then add it to `spark.driver.extraClassPath` and `spark.executor.extraClassPath`, but we would like to move away from setting extra configurations unnecessarily. Example: {code:java} $SPARK_HOME/bin/spark-shell \ --master spark://127.0.0.1:7077 \ --conf spark.shuffle.manager=org.apache.spark.examples.TestShuffleManager \ --jars user-code.jar {code} This yields `java.lang.ClassNotFoundException` in the executors. {code:java} Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1915) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:436) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:425) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.examples.TestShuffleManager at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:467) at org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:41) at org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:36) at org.apache.spark.util.Utils$.classForName(Utils.scala:95) at org.apache.spark.util.Utils$.instantiateSerializerOrShuffleManager(Utils.scala:2574) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:366) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:255) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:487) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61) at java.base/java.security.AccessController.doPrivileged(AccessController.java:712) at java.base/javax.security.auth.Subject.doAs(Subject.java:439) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899) ... 4 more {code} We can change our command to use `extraClassPath`: {code:java} $SPARK_HOME/bin/spark-shell \ --master spark://127.0.0.1:7077 \ --conf spark.shuffle.manager=org.apache.spark.examples.TestShuffleManager \ --conf spark.driver.extraClassPath=user-code.jar \ --conf spark.executor.extraClassPath=user-code.jar {code} Success after adding the jar to `extraClassPath`: {code:java} 23/10/26 12:58:26 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:33053 after 7 ms (0 ms spent in bootstraps) 23/10/26 12:58:26 WARN TestShuffleManager: Instantiated TestShuffleManager!! 23/10/26 12:58:26 INFO DiskBlockManager: Created local directory at /tmp/spark-cb101b05-c4b7-4ba9-8b3d-5b23baa7cb46/executor-5d5335dd-c116-4211-9691-87d8566017fd/blockmgr-2fcb1ab2-d886--8c7f-9dca2c880c2c {code} We would like to change startup order such that the original command succeeds, without specifying `extraClassPath`: {code:java} $SPARK_HOME/bin/spark-shell \ --master spark://127.0.0.1:7077 \ --conf spark.shuffle.manager=org.apache.spark.examples.TestShuffleManager \ --jars user-code.jar {code} Proposed changes: Refactor code so we initialize the `ShuffleManager` later, after jars have been localized. This is especially necessary in the executor, where we would need to move this initialization until after the `replClassLoader` is updated with jars passed in `--jars`. Today, the `ShuffleManager` is instantiated at `SparkEnv` creation. Having to instantiate the `ShuffleManager` this early doesn't work, because user jars have not been localized in all scenarios, and we will fail to load the `ShuffleManager`. We propose moving the `ShuffleManager` instantiation to `SparkContext` on the driver, and Executor,
[jira] [Commented] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred
[ https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534362#comment-17534362 ] Alessandro Bellina commented on SPARK-39131: I'll take a crack at a patch today that can be pull requested for further discussion. > "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred > -- > > Key: SPARK-39131 > URL: https://issues.apache.org/jira/browse/SPARK-39131 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 3.2.1 >Reporter: Alessandro Bellina >Priority: Major > > We would like to propose a slight change in the order of execution of logical > plan optimizer rules given a performance issue we have seen with {{LeftSemi}} > being materialized too late in the logical plan optimizer, and not benefiting > from the null filtering that {{InferFiltersFromConstraints}} can insert. > I have "something that works" locally (see rest of the description for info > and a diff), but given that this is the optimizer it is not clear what else I > could be breaking, so I'd like to hear from the experts on whether this is > the right change. > The query in question is based on TPCDS query16 which originally has an > {{exists}} filter: > {code:sql} > … > and exists (select * > from catalog_sales cs2 > where cs1.cs_order_number = cs2.cs_order_number > and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk) > … > {code} > The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join > like so: > {code:sql} > +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], > LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869) > :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(cs_order_number#17L, 200), > ENSURE_REQUIREMENTS, [id=#364] > : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND > isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) > :+- *(1) ColumnarToRow > : +- FileScan parquet [...] Batched: true, DataFilters: > [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), > isnotnull(cs_call_center_sk#11)],..., PushedFilters: > [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), > IsNotNull(cs_call_center_sk)], ReadSchema: ... > +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(cs_order_number#872L, 200), > ENSURE_REQUIREMENTS, [id=#372] >+- *(3) ColumnarToRow > +- FileScan parquet [...] Batched: true, DataFilters: [], ..., > PushedFilters: [], ReadSchema: ... > {code} > Note that the {{LeftSemi}} key and condition are not being null filtered on > the stream side, and the build side has no filter at all. We have found that > as the dataset size increases, this can become an issue, and in our case, it > was many nulls that will not match. We would like to remove the unnecessary > rows early at the scan and filter phases. > The change we made allows the join key and the condition to be added to the > stream side filter, and for the build side filter to get added: > {code:sql} > +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], > LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940) >:- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(cs_order_number#17L, 200), > ENSURE_REQUIREMENTS, [id=#759] >: +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND > isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND > isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14)) >:+- *(1) ColumnarToRow >: +- FileScan parquet ..., DataFilters: > [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), > isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: > [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), > IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ... >+- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(cs_order_number#943L, 200), > ENSURE_REQUIREMENTS, [id=#768] > +- *(3) Filter (isnotnull(cs_order_number#943L) AND > isnotnull(cs_warehouse_sk#940)) > +- *(3) ColumnarToRow >+- FileScan parquet ..., DataFilters: > [isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., > PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), > IsNotNull(cs_warehouse_sk)], ReadSchema: ... > {code} > This issue can be boiled down to this simple repro: > {code:java} > sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else >
[jira] [Updated] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred
[ https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alessandro Bellina updated SPARK-39131: --- Description: We would like to propose a slight change in the order of execution of logical plan optimizer rules given a performance issue we have seen with {{LeftSemi}} being materialized too late in the logical plan optimizer, and not benefiting from the null filtering that {{InferFiltersFromConstraints}} can insert. I have "something that works" locally (see rest of the description for info and a diff), but given that this is the optimizer it is not clear what else I could be breaking, so I'd like to hear from the experts on whether this is the right change. The query in question is based on TPCDS query16 which originally has an {{exists}} filter: {code:sql} … and exists (select * from catalog_sales cs2 where cs1.cs_order_number = cs2.cs_order_number and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk) … {code} The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join like so: {code:sql} +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869) :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(cs_order_number#17L, 200), ENSURE_REQUIREMENTS, [id=#364] : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) :+- *(1) ColumnarToRow : +- FileScan parquet [...] Batched: true, DataFilters: [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), isnotnull(cs_call_center_sk#11)],..., PushedFilters: [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), IsNotNull(cs_call_center_sk)], ReadSchema: ... +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(cs_order_number#872L, 200), ENSURE_REQUIREMENTS, [id=#372] +- *(3) ColumnarToRow +- FileScan parquet [...] Batched: true, DataFilters: [], ..., PushedFilters: [], ReadSchema: ... {code} Note that the {{LeftSemi}} key and condition are not being null filtered on the stream side, and the build side has no filter at all. We have found that as the dataset size increases, this can become an issue, and in our case, it was many nulls that will not match. We would like to remove the unnecessary rows early at the scan and filter phases. The change we made allows the join key and the condition to be added to the stream side filter, and for the build side filter to get added: {code:sql} +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940) :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(cs_order_number#17L, 200), ENSURE_REQUIREMENTS, [id=#759] : +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14)) :+- *(1) ColumnarToRow : +- FileScan parquet ..., DataFilters: [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ... +- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(cs_order_number#943L, 200), ENSURE_REQUIREMENTS, [id=#768] +- *(3) Filter (isnotnull(cs_order_number#943L) AND isnotnull(cs_warehouse_sk#940)) +- *(3) ColumnarToRow +- FileScan parquet ..., DataFilters: [isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), IsNotNull(cs_warehouse_sk)], ReadSchema: ... {code} This issue can be boiled down to this simple repro: {code:java} sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else {Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table") spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table") spark.sql("select * from my_table t1 where exists(select * from my_table t2 where t2.value = t1.value)").explain(true) {code} Which produces a similar plan, with a {{LeftSemi}} and no filters: {code:sql} == Physical Plan == *(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet [value#19] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
[jira] [Updated] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred
[ https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alessandro Bellina updated SPARK-39131: --- Description: We would like to propose a slight change in the order of execution of logical plan optimizer rules given a performance issue we have seen with {{LeftSemi}} being materialized too late in the logical plan optimizer, and not benefiting from the null filtering that {{InferFiltersFromConstraints}} can insert. I have "something that works" locally (see rest of the description for info and a diff), but given that this is the optimizer it is not clear what else I could be breaking, so I'd like to hear from the experts on whether this is the right change. The query in question is based on TPCDS query16 which originally has an {{exists}} filter: {code:sql} … and exists (select * from catalog_sales cs2 where cs1.cs_order_number = cs2.cs_order_number and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk) … {code} The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join like so: {code:sql} +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869) :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(cs_order_number#17L, 200), ENSURE_REQUIREMENTS, [id=#364] : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) :+- *(1) ColumnarToRow : +- FileScan parquet [...] Batched: true, DataFilters: [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), isnotnull(cs_call_center_sk#11)],..., PushedFilters: [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), IsNotNull(cs_call_center_sk)], ReadSchema: ... +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(cs_order_number#872L, 200), ENSURE_REQUIREMENTS, [id=#372] +- *(3) ColumnarToRow +- FileScan parquet [...] Batched: true, DataFilters: [], ..., PushedFilters: [], ReadSchema: ... {code} Note that the {{LeftSemi}} key and condition are not being null filtered on the stream side, and the build side has not filter at all. We have found that as the dataset size increases, this can become an issue, and in our case, it was many nulls that will not match. We would like to remove the unnecessary rows early at the scan and filter phases. The change we made allows the join key and the condition to be added to the stream side filter, and for the build side filter to get added: {code:sql} +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940) :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(cs_order_number#17L, 200), ENSURE_REQUIREMENTS, [id=#759] : +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14)) :+- *(1) ColumnarToRow : +- FileScan parquet ..., DataFilters: [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ... +- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(cs_order_number#943L, 200), ENSURE_REQUIREMENTS, [id=#768] +- *(3) Filter (isnotnull(cs_order_number#943L) AND isnotnull(cs_warehouse_sk#940)) +- *(3) ColumnarToRow +- FileScan parquet ..., DataFilters: [isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), IsNotNull(cs_warehouse_sk)], ReadSchema: ... {code} This issue can be boiled down to this simple repro: {code:java} sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else {Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table") spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table") spark.sql("select * from my_table t1 where exists(select * from my_table t2 where t2.value = t1.value)").explain(true) {code} Which produces a similar plan, with a {{LeftSemi}} and no filters: {code:sql} == Physical Plan == *(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet [value#19] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
[jira] [Updated] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred
[ https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alessandro Bellina updated SPARK-39131: --- Component/s: SQL > "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred > -- > > Key: SPARK-39131 > URL: https://issues.apache.org/jira/browse/SPARK-39131 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL >Affects Versions: 3.2.1 >Reporter: Alessandro Bellina >Priority: Major > > We would like to propose a slight change in the order of execution of logical > plan optimizer rules given a performance issue we have seen with {{LeftSemi}} > being materialized too late in the logical plan optimizer, and not benefiting > from the null filtering that {{InferFiltersFromConstraints}} can insert. > I have "something that works" locally (see rest of the description for info > and a diff), but given that this is the optimizer it is not clear what else I > could be breaking, so I'd like to hear from the experts on whether this is > the right change. > The query in question is based on TPCDS query16 which originally has an > {{exists}} filter: > {code:sql} > … > and exists (select * > from catalog_sales cs2 > where cs1.cs_order_number = cs2.cs_order_number > and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk) > … > {code} > The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join > like so: > {code:sql} > +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], > LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869) > :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(cs_order_number#17L, 200), > ENSURE_REQUIREMENTS, [id=#364] > : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND > isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) > :+- *(1) ColumnarToRow > : +- FileScan parquet [...] Batched: true, DataFilters: > [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), > isnotnull(cs_call_center_sk#11)],..., PushedFilters: > [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), > IsNotNull(cs_call_center_sk)], ReadSchema: ... > +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(cs_order_number#872L, 200), > ENSURE_REQUIREMENTS, [id=#372] >+- *(3) ColumnarToRow > +- FileScan parquet [...] Batched: true, DataFilters: [], ..., > PushedFilters: [], ReadSchema: ... > {code} > Note that the {{LeftSemi}} key and condition are not being filtered out from > the stream side, and the build side has not filter at all. We have found that > as the dataset size increases, this can become an issue, and in our case, it > was many nulls that will not match. We would like to remove the unnecessary > rows early at the scan and filter phases. > The change we made allows the join key and the condition to be added to the > stream side filter, and for the build side filter to get added: > {code:sql} > +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], > LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940) >:- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 >: +- Exchange hashpartitioning(cs_order_number#17L, 200), > ENSURE_REQUIREMENTS, [id=#759] >: +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND > isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND > isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14)) >:+- *(1) ColumnarToRow >: +- FileScan parquet ..., DataFilters: > [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), > isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: > [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), > IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ... >+- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(cs_order_number#943L, 200), > ENSURE_REQUIREMENTS, [id=#768] > +- *(3) Filter (isnotnull(cs_order_number#943L) AND > isnotnull(cs_warehouse_sk#940)) > +- *(3) ColumnarToRow >+- FileScan parquet ..., DataFilters: > [isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., > PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), > IsNotNull(cs_warehouse_sk)], ReadSchema: ... > {code} > This issue can be boiled down to this simple repro: > {code:java} > sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else > {Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table") >
[jira] [Updated] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred
[ https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alessandro Bellina updated SPARK-39131: --- Description: We would like to propose a slight change in the order of execution of logical plan optimizer rules given a performance issue we have seen with {{LeftSemi}} being materialized too late in the logical plan optimizer, and not benefiting from the null filtering that {{InferFiltersFromConstraints}} can insert. I have "something that works" locally (see rest of the description for info and a diff), but given that this is the optimizer it is not clear what else I could be breaking, so I'd like to hear from the experts on whether this is the right change. The query in question is based on TPCDS query16 which originally has an {{exists}} filter: {code:sql} … and exists (select * from catalog_sales cs2 where cs1.cs_order_number = cs2.cs_order_number and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk) … {code} The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join like so: {code:sql} +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869) :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(cs_order_number#17L, 200), ENSURE_REQUIREMENTS, [id=#364] : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) :+- *(1) ColumnarToRow : +- FileScan parquet [...] Batched: true, DataFilters: [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), isnotnull(cs_call_center_sk#11)],..., PushedFilters: [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), IsNotNull(cs_call_center_sk)], ReadSchema: ... +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(cs_order_number#872L, 200), ENSURE_REQUIREMENTS, [id=#372] +- *(3) ColumnarToRow +- FileScan parquet [...] Batched: true, DataFilters: [], ..., PushedFilters: [], ReadSchema: ... {code} Note that the {{LeftSemi}} key and condition are not being filtered out from the stream side, and the build side has not filter at all. We have found that as the dataset size increases, this can become an issue, and in our case, it was many nulls that will not match. We would like to remove the unnecessary rows early at the scan and filter phases. The change we made allows the join key and the condition to be added to the stream side filter, and for the build side filter to get added: {code:sql} +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940) :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(cs_order_number#17L, 200), ENSURE_REQUIREMENTS, [id=#759] : +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14)) :+- *(1) ColumnarToRow : +- FileScan parquet ..., DataFilters: [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ... +- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(cs_order_number#943L, 200), ENSURE_REQUIREMENTS, [id=#768] +- *(3) Filter (isnotnull(cs_order_number#943L) AND isnotnull(cs_warehouse_sk#940)) +- *(3) ColumnarToRow +- FileScan parquet ..., DataFilters: [isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), IsNotNull(cs_warehouse_sk)], ReadSchema: ... {code} This issue can be boiled down to this simple repro: {code:java} sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else {Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table") spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table") spark.sql("select * from my_table t1 where exists(select * from my_table t2 where t2.value = t1.value)").explain(true) {code} Which produces a similar plan, with a {{LeftSemi}} and no filters: {code:sql} == Physical Plan == *(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet [value#19] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
[jira] [Created] (SPARK-39131) "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred
Alessandro Bellina created SPARK-39131: -- Summary: "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred Key: SPARK-39131 URL: https://issues.apache.org/jira/browse/SPARK-39131 Project: Spark Issue Type: Bug Components: Optimizer Affects Versions: 3.2.1 Reporter: Alessandro Bellina We would like to propose a slight change in the order of execution of logical plan optimizer rules given a performance issue we have seen with {{LeftSemi}} being materialized too late in the logical plan optimizer, and not benefiting from the null filtering that {{InferFiltersFromConstraints}} can insert. I have "something that works" locally (see rest of the description for info and a diff), but given that this is the optimizer it is not clear what else I could be breaking, so I'd like to hear from the experts on whether this is the right change. The query in question is based on TPCDS query16 which originally has an {{exists}} filter: {code:sql} … and exists (select * from catalog_sales cs2 where cs1.cs_order_number = cs2.cs_order_number and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk) … {code} The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join like so: {code:sql} +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869) :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(cs_order_number#17L, 200), ENSURE_REQUIREMENTS, [id=#364] : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) :+- *(1) ColumnarToRow : +- FileScan parquet [...] Batched: true, DataFilters: [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), isnotnull(cs_call_center_sk#11)],..., PushedFilters: [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), IsNotNull(cs_call_center_sk)], ReadSchema: ... +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(cs_order_number#872L, 200), ENSURE_REQUIREMENTS, [id=#372] +- *(3) ColumnarToRow +- FileScan parquet [...] Batched: true, DataFilters: [], ..., PushedFilters: [], ReadSchema: ... {code} Note that the {{LeftSemi}} key and condition are not being filtered out from the stream side, and the build side has not filter at all. We have found that as the dataset size increases, this can become an issue, and in our case, it was many nulls that will not match. We would like to remove the unnecessary rows early at the scan and filter phases. The change we made allows the join key and the condition to be added to the stream side filter, and for the build side filter to get added: {code:java} +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940) :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(cs_order_number#17L, 200), ENSURE_REQUIREMENTS, [id=#759] : +-*(1) Filter isnotnull(cs_ship_date_sk#2) AND isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14)) :+- *(1) ColumnarToRow : +- FileScan parquet ..., DataFilters: [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ... +- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(cs_order_number#943L, 200), ENSURE_REQUIREMENTS, [id=#768] +- *(3) Filter (isnotnull(cs_order_number#943L) AND isnotnull(cs_warehouse_sk#940)) +- *(3) ColumnarToRow +- FileScan parquet ..., DataFilters: [isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), IsNotNull(cs_warehouse_sk)], ReadSchema: ... {code} This issue can be boiled down to this simple repro: {code:java} sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else {Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table") spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table") spark.sql("select * from my_table t1 where exists(select * from my_table t2 where t2.value = t1.value)").explain(true) {code} Which produces a similar plan, with a {{LeftSemi}} and no filters: {code:java} == Physical Plan == *(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet [value#19] Batched: true,
[jira] [Created] (SPARK-29780) The UI can access into the ResourceAllocator, whose data structures are being updated from scheduler threads
Alessandro Bellina created SPARK-29780: -- Summary: The UI can access into the ResourceAllocator, whose data structures are being updated from scheduler threads Key: SPARK-29780 URL: https://issues.apache.org/jira/browse/SPARK-29780 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.0 Reporter: Alessandro Bellina A class extending ResourceAllocator (WorkerResourceInfo), has some potential issues (raised here: https://github.com/apache/spark/pull/26078#discussion_r340342820) The WorkerInfo class is calling into availableAddrs and assignedAddrs but those calls appear to be coming from the UI (looking at the resourcesInfo* functions), e.g. JsonProtocol and MasterPage call this. Since the datastructures in ResourceAllocator are not concurrent, we could end up with bad data or potentially crashes, depending on when the calls are made. Note that there are other calls to the resourceInfo* functions, but those are from the event loop. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28679) Spark Yarn ResourceRequestHelper shouldn't lookup setResourceInformation is no resources specified
[ https://issues.apache.org/jira/browse/SPARK-28679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16904004#comment-16904004 ] Alessandro Bellina commented on SPARK-28679: Working on it. > Spark Yarn ResourceRequestHelper shouldn't lookup setResourceInformation is > no resources specified > -- > > Key: SPARK-28679 > URL: https://issues.apache.org/jira/browse/SPARK-28679 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 3.0.0 >Reporter: Thomas Graves >Priority: Minor > > in the Spark Yarn ResourceRequestHelper it uses reflection to lookup > setResourceInformation. We should skip that lookup if the resource Map is > empty. > [https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala#L154] > -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27558) NPE in TaskCompletionListener due to Spark OOM in UnsafeExternalSorter causing tasks to hang
[ https://issues.apache.org/jira/browse/SPARK-27558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alessandro Bellina updated SPARK-27558: --- Description: We see an NPE in the UnsafeInMemorySorter.getMemoryUsage function (due to the array we are accessing there being null). This looks to be caused by a Spark OOM when UnsafeInMemorySorter is trying to spill. This is likely a symptom of https://issues.apache.org/jira/browse/SPARK-21492. The real question for this ticket is, could we handle things more gracefully, rather than NPE. For example: Remove this: https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java#L182 so when this fails (and store the new array into a temporary): https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java#L186 we don't end up with a null "array". This state is causing one of our jobs to hang infinitely (we think) due to the original allocation error. Stack trace for reference {noformat} 2019-04-23 08:57:14,989 [Executor task launch worker for task 46729] ERROR org.apache.spark.TaskContextImpl - Error in TaskCompletionListener java.lang.NullPointerException at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getMemoryUsage(UnsafeInMemorySorter.java:208) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getMemoryUsage(UnsafeExternalSorter.java:249) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.updatePeakMemoryUsed(UnsafeExternalSorter.java:253) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:296) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:328) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.lambda$new$0(UnsafeExternalSorter.java:178) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118) at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:131) at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:129) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:129) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117) at org.apache.spark.scheduler.Task.run(Task.scala:119) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-04-23 08:57:15,069 [Executor task launch worker for task 46729] ERROR org.apache.spark.executor.Executor - Exception in task 102.0 in stage 28.0 (TID 46729) org.apache.spark.util.TaskCompletionListenerException: null Previous exception in task: Unable to acquire 65536 bytes of memory, got 0 org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98) org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:186) org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:229) org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:204) org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:283) org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:96) org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:348) org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:403) org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage23.sort_addToSorter_0$(Unknown Source) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage23.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[jira] [Created] (SPARK-27558) NPE in TaskCompletionListener due to Spark OOM in UnsafeExternalSorter causing tasks to hang
Alessandro Bellina created SPARK-27558: -- Summary: NPE in TaskCompletionListener due to Spark OOM in UnsafeExternalSorter causing tasks to hang Key: SPARK-27558 URL: https://issues.apache.org/jira/browse/SPARK-27558 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.2, 2.3.3 Reporter: Alessandro Bellina We see an NPE in the UnsafeInMemorySorter.getMemoryUsage function (due to the array we are accessing there being null). This looks to be caused by a Spark OOM when UnsafeInMemorySorter is trying to spill. This is likely a symptom of https://issues.apache.org/jira/browse/SPARK-21492. The real question for this ticket is, could we handle things more gracefully, rather than NPE. For example: allocate the new array into a temporary here: https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java#L182 so when this fails: https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java#L186 we don't end up with a null "array". This state is causing one of our jobs to hang infinitely (we think) due to the original allocation error. Stack trace for reference {noformat} 2019-04-23 08:57:14,989 [Executor task launch worker for task 46729] ERROR org.apache.spark.TaskContextImpl - Error in TaskCompletionListener java.lang.NullPointerException at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getMemoryUsage(UnsafeInMemorySorter.java:208) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getMemoryUsage(UnsafeExternalSorter.java:249) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.updatePeakMemoryUsed(UnsafeExternalSorter.java:253) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:296) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:328) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.lambda$new$0(UnsafeExternalSorter.java:178) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118) at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:131) at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:129) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:129) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117) at org.apache.spark.scheduler.Task.run(Task.scala:119) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-04-23 08:57:15,069 [Executor task launch worker for task 46729] ERROR org.apache.spark.executor.Executor - Exception in task 102.0 in stage 28.0 (TID 46729) org.apache.spark.util.TaskCompletionListenerException: null Previous exception in task: Unable to acquire 65536 bytes of memory, got 0 org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98) org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:186) org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:229) org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:204) org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:283) org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:96) org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:348) org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:403) org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage23.sort_addToSorter_0$(Unknown Source)
[jira] [Commented] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins
[ https://issues.apache.org/jira/browse/SPARK-26944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16784853#comment-16784853 ] Alessandro Bellina commented on SPARK-26944: [~shaneknapp] nice!! thank you > Python unit-tests.log not available in artifacts for a build in Jenkins > --- > > Key: SPARK-26944 > URL: https://issues.apache.org/jira/browse/SPARK-26944 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 2.4.0 >Reporter: Alessandro Bellina >Assignee: shane knapp >Priority: Minor > Attachments: Screen Shot 2019-03-05 at 12.08.43 PM.png > > > I had a pr where the python unit tests failed. The tests point at the > `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, > but I can't get to that from jenkins UI it seems (are all prs writing to the > same file?). > {code:java} > > Running PySpark tests > > Running PySpark tests. Output is in > /home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code} > For reference, please see this build: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console > This Jira is to make it available under the artifacts for each build. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26809) insert overwrite directory + concat function => error
[ https://issues.apache.org/jira/browse/SPARK-26809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775908#comment-16775908 ] Alessandro Bellina commented on SPARK-26809: If you provide an alias for the concatenated columns, it works: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat("foo", "bar") as foo_bar {noformat} > insert overwrite directory + concat function => error > - > > Key: SPARK-26809 > URL: https://issues.apache.org/jira/browse/SPARK-26809 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: ant_nebula >Priority: Critical > > insert overwrite directory '/tmp/xx' > select concat(col1, col2) > from tableXX > limit 3 > > Caused by: org.apache.hadoop.hive.serde2.SerDeException: > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 3 elements > while columns.types has 2 elements! > at > org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145) > at > org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85) > at > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125) > at > org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:119) > at > org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:103) > at > org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120) > at > org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:108) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:233) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26809) insert overwrite directory + concat function => error
[ https://issues.apache.org/jira/browse/SPARK-26809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775772#comment-16775772 ] Alessandro Bellina edited comment on SPARK-26809 at 2/23/19 3:25 AM: - This does it. Didn't need the limit to reproduce: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat(col1, col2) from ((select "foo" as col1, "bar" as col2)); {noformat} This also triggers it: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat("foo", "bar") {noformat} {noformat} Caused by: org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145) at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85) at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125) at org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121) at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {noformat} was (Author: abellina): This does it. Didn't need the limit to reproduce: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat(col1, col2) {noformat} This also triggers it: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat(col1, col2) from ((select "foo" as col1, "bar" as col2)); {noformat} {noformat} Caused by: org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145) at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85) at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125) at org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121) at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {noformat} > insert overwrite directory + concat function => error > - > > Key: SPARK-26809 > URL: https://issues.apache.org/jira/browse/SPARK-26809 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: ant_nebula >Priority: Critical > > insert overwrite
[jira] [Comment Edited] (SPARK-26809) insert overwrite directory + concat function => error
[ https://issues.apache.org/jira/browse/SPARK-26809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775772#comment-16775772 ] Alessandro Bellina edited comment on SPARK-26809 at 2/23/19 3:25 AM: - This does it. Didn't need the limit to reproduce: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat(col1, col2) {noformat} This also triggers it: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat(col1, col2) from ((select "foo" as col1, "bar" as col2)); {noformat} {noformat} Caused by: org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145) at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85) at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125) at org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121) at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {noformat} was (Author: abellina): This does it. Didn't need the limit to reproduce: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat(col1, col2) from ((select "foo" as col1, "bar" as col2)); {noformat} This also triggers it: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat(col1, col2) from ((select "foo" as col1, "bar" as col2)); {noformat} {noformat} Caused by: org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145) at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85) at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125) at org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121) at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {noformat} > insert overwrite directory + concat function => error > - > > Key: SPARK-26809 > URL: https://issues.apache.org/jira/browse/SPARK-26809 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: ant_nebula >
[jira] [Comment Edited] (SPARK-26809) insert overwrite directory + concat function => error
[ https://issues.apache.org/jira/browse/SPARK-26809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775772#comment-16775772 ] Alessandro Bellina edited comment on SPARK-26809 at 2/23/19 3:25 AM: - This does it. Didn't need the limit to reproduce: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat(col1, col2) from ((select "foo" as col1, "bar" as col2)); {noformat} This also triggers it: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat(col1, col2) from ((select "foo" as col1, "bar" as col2)); {noformat} {noformat} Caused by: org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145) at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85) at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125) at org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121) at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {noformat} was (Author: abellina): This does it. Didn't need the limit to reproduce: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat(col1, col2) from ((select "foo" as col1, "bar" as col2)); {noformat} {noformat} Caused by: org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145) at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85) at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125) at org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121) at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {noformat} > insert overwrite directory + concat function => error > - > > Key: SPARK-26809 > URL: https://issues.apache.org/jira/browse/SPARK-26809 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: ant_nebula >Priority: Critical > > insert overwrite directory '/tmp/xx' > select concat(col1, col2) > from tableXX > limit 3 > >
[jira] [Commented] (SPARK-26809) insert overwrite directory + concat function => error
[ https://issues.apache.org/jira/browse/SPARK-26809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775772#comment-16775772 ] Alessandro Bellina commented on SPARK-26809: This does it. Didn't need the limit to reproduce: {noformat} insert overwrite directory '/tmp/SPARK-26809' select concat(col1, col2) from ((select "foo" as col1, "bar" as col2)); {noformat} {noformat} Caused by: org.apache.hadoop.hive.serde2.SerDeException: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 2 elements while columns.types has 1 elements! at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145) at org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85) at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125) at org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:121) at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:104) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:109) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1319) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {noformat} > insert overwrite directory + concat function => error > - > > Key: SPARK-26809 > URL: https://issues.apache.org/jira/browse/SPARK-26809 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: ant_nebula >Priority: Critical > > insert overwrite directory '/tmp/xx' > select concat(col1, col2) > from tableXX > limit 3 > > Caused by: org.apache.hadoop.hive.serde2.SerDeException: > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 3 elements > while columns.types has 2 elements! > at > org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145) > at > org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.(LazySerDeParameters.java:85) > at > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125) > at > org.apache.spark.sql.hive.execution.HiveOutputWriter.(HiveFileFormat.scala:119) > at > org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:103) > at > org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120) > at > org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:108) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:233) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26945) Python streaming tests flaky while cleaning temp directories after StreamingQuery.stop
[ https://issues.apache.org/jira/browse/SPARK-26945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775125#comment-16775125 ] Alessandro Bellina commented on SPARK-26945: [~hyukjin.kwon] thanks for taking a look. Seems like q.processAllAvailable is designed for this use case. > Python streaming tests flaky while cleaning temp directories after > StreamingQuery.stop > -- > > Key: SPARK-26945 > URL: https://issues.apache.org/jira/browse/SPARK-26945 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.0 >Reporter: Alessandro Bellina >Priority: Minor > > From the test code, it seems like the `shmutil.rmtree` function is trying to > delete a directory, but there's likely another thread adding entries to a > directory, so when it gets to `os.rmdir(path)` it blows up. I think the test > (and other streaming tests) should call `q.awaitTermination` after `q.stop`, > before going on. I'll file a separate jira. > {noformat} > ERROR: test_query_manager_await_termination > (pyspark.sql.tests.test_streaming.StreamingTests) > -- > Traceback (most recent call last): > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/tests/test_streaming.py", > line 259, in test_query_manager_await_termination > shutil.rmtree(tmpPath) > File "/home/anaconda/lib/python2.7/shutil.py", line 256, in rmtree > onerror(os.rmdir, path, sys.exc_info()) > File "/home/anaconda/lib/python2.7/shutil.py", line 254, in rmtree > os.rmdir(path) > OSError: [Errno 39] Directory not empty: > '/home/jenkins/workspace/SparkPullRequestBuilder/python/target/072153bd-f981-47be-bda2-e2b657a16f65/tmp4WGp7n'{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins
[ https://issues.apache.org/jira/browse/SPARK-26944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775117#comment-16775117 ] Alessandro Bellina edited comment on SPARK-26944 at 2/22/19 1:07 PM: - Hmm, I have a subsequent build from the same PR, and I don't see a link to the python tests either. Maybe I am looking in the wrong place? [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102590/artifact/] was (Author: abellina): Hmm, I have a subsequent build from the same PR, and I don't see a link to the python tests either. Maybe I am looking in the wrong place? https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102590/artifact/ > Python unit-tests.log not available in artifacts for a build in Jenkins > --- > > Key: SPARK-26944 > URL: https://issues.apache.org/jira/browse/SPARK-26944 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 2.4.0 >Reporter: Alessandro Bellina >Priority: Minor > > I had a pr where the python unit tests failed. The tests point at the > `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, > but I can't get to that from jenkins UI it seems (are all prs writing to the > same file?). > {code:java} > > Running PySpark tests > > Running PySpark tests. Output is in > /home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code} > For reference, please see this build: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console > This Jira is to make it available under the artifacts for each build. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins
[ https://issues.apache.org/jira/browse/SPARK-26944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16775117#comment-16775117 ] Alessandro Bellina commented on SPARK-26944: Hmm, I have a subsequent build from the same PR, and I don't see a link to the python tests either. Maybe I am looking in the wrong place? https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102590/artifact/ > Python unit-tests.log not available in artifacts for a build in Jenkins > --- > > Key: SPARK-26944 > URL: https://issues.apache.org/jira/browse/SPARK-26944 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 2.4.0 >Reporter: Alessandro Bellina >Priority: Minor > > I had a pr where the python unit tests failed. The tests point at the > `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, > but I can't get to that from jenkins UI it seems (are all prs writing to the > same file?). > {code:java} > > Running PySpark tests > > Running PySpark tests. Output is in > /home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code} > For reference, please see this build: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console > This Jira is to make it available under the artifacts for each build. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins
[ https://issues.apache.org/jira/browse/SPARK-26944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16774800#comment-16774800 ] Alessandro Bellina commented on SPARK-26944: [~shivuson...@gmail.com] [~hyukjin.kwon] I added a link to the description. I think we should look at the Jenkins configuration to see what is getting copied to artifacts, though I believe you need special access for that. > Python unit-tests.log not available in artifacts for a build in Jenkins > --- > > Key: SPARK-26944 > URL: https://issues.apache.org/jira/browse/SPARK-26944 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 2.4.0 >Reporter: Alessandro Bellina >Priority: Minor > > I had a pr where the python unit tests failed. The tests point at the > `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, > but I can't get to that from jenkins UI it seems (are all prs writing to the > same file?). > {code:java} > > Running PySpark tests > > Running PySpark tests. Output is in > /home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code} > For reference, please see this build: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console > This Jira is to make it available under the artifacts for each build. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins
[ https://issues.apache.org/jira/browse/SPARK-26944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alessandro Bellina updated SPARK-26944: --- Description: I had a pr where the python unit tests failed. The tests point at the `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, but I can't get to that from jenkins UI it seems (are all prs writing to the same file?). {code:java} Running PySpark tests Running PySpark tests. Output is in /home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code} For reference, please see this build: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console This Jira is to make it available under the artifacts for each build. was: I had a pr where the python unit tests failed. The tests point at the `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, but I can't get to that from jenkins UI it seems (are all prs writing to the same file?). This Jira is to make it available under the artifacts for each build. > Python unit-tests.log not available in artifacts for a build in Jenkins > --- > > Key: SPARK-26944 > URL: https://issues.apache.org/jira/browse/SPARK-26944 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 2.4.0 >Reporter: Alessandro Bellina >Priority: Minor > > I had a pr where the python unit tests failed. The tests point at the > `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, > but I can't get to that from jenkins UI it seems (are all prs writing to the > same file?). > {code:java} > > Running PySpark tests > > Running PySpark tests. Output is in > /home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log{code} > For reference, please see this build: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/102518/console > This Jira is to make it available under the artifacts for each build. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26945) Python streaming tests flaky while cleaning temp directories after StreamingQuery.stop
Alessandro Bellina created SPARK-26945: -- Summary: Python streaming tests flaky while cleaning temp directories after StreamingQuery.stop Key: SPARK-26945 URL: https://issues.apache.org/jira/browse/SPARK-26945 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.4.0 Reporter: Alessandro Bellina >From the test code, it seems like the `shmutil.rmtree` function is trying to >delete a directory, but there's likely another thread adding entries to a >directory, so when it gets to `os.rmdir(path)` it blows up. I think the test >(and other streaming tests) should call `q.awaitTermination` after `q.stop`, >before going on. I'll file a separate jira. {noformat} ERROR: test_query_manager_await_termination (pyspark.sql.tests.test_streaming.StreamingTests) -- Traceback (most recent call last): File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/sql/tests/test_streaming.py", line 259, in test_query_manager_await_termination shutil.rmtree(tmpPath) File "/home/anaconda/lib/python2.7/shutil.py", line 256, in rmtree onerror(os.rmdir, path, sys.exc_info()) File "/home/anaconda/lib/python2.7/shutil.py", line 254, in rmtree os.rmdir(path) OSError: [Errno 39] Directory not empty: '/home/jenkins/workspace/SparkPullRequestBuilder/python/target/072153bd-f981-47be-bda2-e2b657a16f65/tmp4WGp7n'{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26944) Python unit-tests.log not available in artifacts for a build in Jenkins
Alessandro Bellina created SPARK-26944: -- Summary: Python unit-tests.log not available in artifacts for a build in Jenkins Key: SPARK-26944 URL: https://issues.apache.org/jira/browse/SPARK-26944 Project: Spark Issue Type: Improvement Components: Build Affects Versions: 2.4.0 Reporter: Alessandro Bellina I had a pr where the python unit tests failed. The tests point at the `/home/jenkins/workspace/SparkPullRequestBuilder/python/unit-tests.log` file, but I can't get to that from jenkins UI it seems (are all prs writing to the same file?). This Jira is to make it available under the artifacts for each build. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26895) When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to resolve globs owned by target user
[ https://issues.apache.org/jira/browse/SPARK-26895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alessandro Bellina updated SPARK-26895: --- Summary: When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to resolve globs owned by target user (was: When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to resolve globs owned by target uer) > When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to > resolve globs owned by target user > -- > > Key: SPARK-26895 > URL: https://issues.apache.org/jira/browse/SPARK-26895 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2, 2.4.0 >Reporter: Alessandro Bellina >Priority: Critical > > We are resolving globs in SparkSubmit here (by way of > prepareSubmitEnvironment) without first going into a doAs: > https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L143 > Without first entering a doAs, as done here: > [https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L151] > So when running spark-submit with --proxy-user, and for example --archives, > it will fail to launch unless the location of the archive is open to the user > that executed spark-submit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26895) When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to resolve globs owned by target uer
[ https://issues.apache.org/jira/browse/SPARK-26895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alessandro Bellina updated SPARK-26895: --- Summary: When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to resolve globs owned by target uer (was: When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to resolve globs) > When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to > resolve globs owned by target uer > - > > Key: SPARK-26895 > URL: https://issues.apache.org/jira/browse/SPARK-26895 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2, 2.4.0 >Reporter: Alessandro Bellina >Priority: Critical > > We are resolving globs in SparkSubmit here (by way of > prepareSubmitEnvironment) without first going into a doAs: > https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L143 > Without first entering a doAs, as done here: > [https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L151] > So when running spark-submit with --proxy-user, and for example --archives, > it will fail to launch unless the location of the archive is open to the user > that executed spark-submit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26895) When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to resolve globs
[ https://issues.apache.org/jira/browse/SPARK-26895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alessandro Bellina updated SPARK-26895: --- Summary: When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to resolve globs (was: When running spark 2.3 as a proxy user --proxy-user, SparkSubmit fails to resolve globs) > When running spark 2.3 as a proxy user (--proxy-user), SparkSubmit fails to > resolve globs > - > > Key: SPARK-26895 > URL: https://issues.apache.org/jira/browse/SPARK-26895 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2, 2.4.0 >Reporter: Alessandro Bellina >Priority: Critical > > We are resolving globs in SparkSubmit here (by way of > prepareSubmitEnvironment) without first going into a doAs: > https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L143 > Without first entering a doAs, as done here: > [https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L151] > So when running spark-submit with --proxy-user, and for example --archives, > it will fail to launch unless the location of the archive is open to the user > that executed spark-submit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26895) When running spark 2.3 as a proxy user --proxy-user, SparkSubmit fails to resolve globs
[ https://issues.apache.org/jira/browse/SPARK-26895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16769666#comment-16769666 ] Alessandro Bellina commented on SPARK-26895: I am working on this. > When running spark 2.3 as a proxy user --proxy-user, SparkSubmit fails to > resolve globs > --- > > Key: SPARK-26895 > URL: https://issues.apache.org/jira/browse/SPARK-26895 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2, 2.4.0 >Reporter: Alessandro Bellina >Priority: Critical > > We are resolving globs in SparkSubmit here (by way of > prepareSubmitEnvironment) without first going into a doAs: > https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L143 > Without first entering a doAs, as done here: > [https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L151] > So when running spark-submit with --proxy-user, and for example --archives, > it will fail to launch unless the location of the archive is open to the user > that executed spark-submit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26895) When running spark 2.3 as a proxy user --proxy-user, SparkSubmit fails to resolve globs
Alessandro Bellina created SPARK-26895: -- Summary: When running spark 2.3 as a proxy user --proxy-user, SparkSubmit fails to resolve globs Key: SPARK-26895 URL: https://issues.apache.org/jira/browse/SPARK-26895 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0, 2.3.2 Reporter: Alessandro Bellina We are resolving globs in SparkSubmit here (by way of prepareSubmitEnvironment) without first going into a doAs: https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L143 Without first entering a doAs, as done here: [https://github.com/apache/spark/blob/6c18d8d8079ac4d2d6dc7539601ab83fc5b51760/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L151] So when running spark-submit with --proxy-user, and for example --archives, it will fail to launch unless the location of the archive is open to the user that executed spark-submit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26257) SPIP: Interop Support for Spark Language Extensions
[ https://issues.apache.org/jira/browse/SPARK-26257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16742519#comment-16742519 ] Alessandro Bellina commented on SPARK-26257: [~tcondie] I think you should share the design doc here too. It's a good idea, just curious on the details of what's being proposed. > SPIP: Interop Support for Spark Language Extensions > --- > > Key: SPARK-26257 > URL: https://issues.apache.org/jira/browse/SPARK-26257 > Project: Spark > Issue Type: Improvement > Components: PySpark, R, Spark Core >Affects Versions: 2.4.0 >Reporter: Tyson Condie >Priority: Minor > > h2. ** Background and Motivation: > There is a desire for third party language extensions for Apache Spark. Some > notable examples include: > * C#/F# from project Mobius [https://github.com/Microsoft/Mobius] > * Haskell from project sparkle [https://github.com/tweag/sparkle] > * Julia from project Spark.jl [https://github.com/dfdx/Spark.jl] > Presently, Apache Spark supports Python and R via a tightly integrated > interop layer. It would seem that much of that existing interop layer could > be refactored into a clean surface for general (third party) language > bindings, such as the above mentioned. More specifically, could we generalize > the following modules: > * Deploy runners (e.g., PythonRunner and RRunner) > * DataFrame Executors > * RDD operations? > The last being questionable: integrating third party language extensions at > the RDD level may be too heavy-weight and unnecessary given the preference > towards the Dataframe abstraction. > The main goals of this effort would be: > * Provide a clean abstraction for third party language extensions making it > easier to maintain (the language extension) with the evolution of Apache Spark > * Provide guidance to third party language authors on how a language > extension should be implemented > * Provide general reusable libraries that are not specific to any language > extension > * Open the door to developers that prefer alternative languages > * Identify and clean up common code shared between Python and R interops > h2. Target Personas: > Data Scientists, Data Engineers, Library Developers > h2. Goals: > Data scientists and engineers will have the opportunity to work with Spark in > languages other than what’s natively supported. Library developers will be > able to create language extensions for Spark in a clean way. The interop > layer should also provide guidance for developing language extensions. > h2. Non-Goals: > The proposal does not aim to create an actual language extension. Rather, it > aims to provide a stable interop layer for third party language extensions to > dock. > h2. Proposed API Changes: > Much of the work will involve generalizing existing interop APIs for PySpark > and R, specifically for the Dataframe API. For instance, it would be good to > have a general deploy.Runner (similar to PythonRunner) for language extension > efforts. In Spark SQL, it would be good to have a general InteropUDF and > evaluator (similar to BatchEvalPythonExec). > Low-level RDD operations should not be needed in this initial offering; > depending on the success of the interop layer and with proper demand, RDD > interop could be added later. However, one open question is supporting a > subset of low-level functions that are core to ETL e.g., transform. > h2. Optional Design Sketch: > The work would be broken down into two top-level phases: > Phase 1: Introduce general interop API for deploying a driver/application, > running an interop UDF along with any other low-level transformations that > aid with ETL. > Phase 2: Port existing Python and R language extensions to the new interop > layer. This port should be contained solely to the Spark core side, and all > protocols specific to Python and R should not change e.g., Python should > continue to use py4j is the protocol between the Python process and core > Spark. The port itself should be contained to a handful of files e.g., some > examples for Python: PythonRunner, BatchEvalPythonExec, +PythonUDFRunner+, > PythonRDD (possibly), and will mostly involve refactoring common logic > abstract implementations and utilities. > h2. Optional Rejected Designs: > The clear alternative is the status quo; developers that want to provide a > third-party language extension to Spark do so directly; often by extending > existing Python classes and overriding the portions that are relevant to the > new extension. Not only is this not sound code (e.g., an JuliaRDD is not a > PythonRDD, which contains a lot of reusable code), but it runs the great risk > of future revisions making the subclass implementation obsolete. It would be > hard to imagine that any third-party
[jira] [Commented] (SPARK-26285) Add a metric source for accumulators (aka AccumulatorSource)
[ https://issues.apache.org/jira/browse/SPARK-26285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710970#comment-16710970 ] Alessandro Bellina commented on SPARK-26285: I can't assign this issue, but I am putting up a PR for it. > Add a metric source for accumulators (aka AccumulatorSource) > > > Key: SPARK-26285 > URL: https://issues.apache.org/jira/browse/SPARK-26285 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Alessandro Bellina >Priority: Minor > > We'd like a simple mechanism to register spark accumulators against the > codahale metrics registry. > This task proposes adding a LongAccumulatorSource and a > DoubleAccumulatorSource. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26285) Add a metric source for accumulators (aka AccumulatorSource)
Alessandro Bellina created SPARK-26285: -- Summary: Add a metric source for accumulators (aka AccumulatorSource) Key: SPARK-26285 URL: https://issues.apache.org/jira/browse/SPARK-26285 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0 Reporter: Alessandro Bellina We'd like a simple mechanism to register spark accumulators against the codahale metrics registry. This task proposes adding a LongAccumulatorSource and a DoubleAccumulatorSource. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25043) spark-sql should print the appId and master on startup
Alessandro Bellina created SPARK-25043: -- Summary: spark-sql should print the appId and master on startup Key: SPARK-25043 URL: https://issues.apache.org/jira/browse/SPARK-25043 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.1 Reporter: Alessandro Bellina In spark-sql, if logging is turned down all the way, it's not possible to find out what appId is running at the moment. This small change as a print to stdout containing the master type and the appId to have that on screen. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org