[jira] [Updated] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf
[ https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-26192: Description: There are at least two options accessed in MesosClusterScheduler that should come from the submission's configuration instead of the dispatcher's: spark.app.name spark.mesos.fetchCache.enable This means that all Mesos tasks for Spark drivers have uninformative names of the form "Driver for (MainClass)" rather than the configured application name, and Spark drivers never cache files unless the caching setting is specified on dispatcher as well. Coincidentally, the spark.mesos.fetchCache.enable option was misnamed, as referenced in the linked JIRA. was: There are at least two options accessed in MesosClusterScheduler that should come from the submission's configuration instead of the dispatcher's: spark.app.name spark.mesos.fetchCache.enable This means that all Mesos tasks for Spark drivers have uninformative names of the form "Driver for (MainClass)" rather than the configured application name, and Spark drivers never cache files. Coincidentally, the spark.mesos.fetchCache.enable option is misnamed, as referenced in the linked JIRA. > MesosClusterScheduler reads options from dispatcher conf instead of > submission conf > --- > > Key: SPARK-26192 > URL: https://issues.apache.org/jira/browse/SPARK-26192 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > There are at least two options accessed in MesosClusterScheduler that should > come from the submission's configuration instead of the dispatcher's: > spark.app.name > spark.mesos.fetchCache.enable > This means that all Mesos tasks for Spark drivers have uninformative names of > the form "Driver for (MainClass)" rather than the configured application > name, and Spark drivers never cache files unless the caching setting is > specified on dispatcher as well. Coincidentally, the > spark.mesos.fetchCache.enable option was misnamed, as referenced in the > linked JIRA. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf
[ https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-26192: Description: There are at least two options accessed in MesosClusterScheduler that should come from the submission's configuration instead of the dispatcher's: spark.app.name spark.mesos.fetcherCache.enable This means that all Mesos tasks for Spark drivers have uninformative names of the form "Driver for (MainClass)" rather than the configured application name, and Spark drivers never cache files unless the caching setting is specified on dispatcher as well. Coincidentally, the spark.mesos.fetchCache.enable option was misnamed, as referenced in the linked JIRA. was: There are at least two options accessed in MesosClusterScheduler that should come from the submission's configuration instead of the dispatcher's: spark.app.name spark.mesos.fetchCache.enable This means that all Mesos tasks for Spark drivers have uninformative names of the form "Driver for (MainClass)" rather than the configured application name, and Spark drivers never cache files unless the caching setting is specified on dispatcher as well. Coincidentally, the spark.mesos.fetchCache.enable option was misnamed, as referenced in the linked JIRA. > MesosClusterScheduler reads options from dispatcher conf instead of > submission conf > --- > > Key: SPARK-26192 > URL: https://issues.apache.org/jira/browse/SPARK-26192 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > There are at least two options accessed in MesosClusterScheduler that should > come from the submission's configuration instead of the dispatcher's: > spark.app.name > spark.mesos.fetcherCache.enable > This means that all Mesos tasks for Spark drivers have uninformative names of > the form "Driver for (MainClass)" rather than the configured application > name, and Spark drivers never cache files unless the caching setting is > specified on dispatcher as well. Coincidentally, the > spark.mesos.fetchCache.enable option was misnamed, as referenced in the > linked JIRA. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf
[ https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-26192: Description: There is at least one option accessed in MesosClusterScheduler that should come from the submission's configuration instead of the dispatcher's: spark.mesos.fetcherCache.enable Coincidentally, the spark.mesos.fetcherCache.enable option was previously misnamed, as referenced in the linked JIRA. was: There are at least two options accessed in MesosClusterScheduler that should come from the submission's configuration instead of the dispatcher's: spark.app.name spark.mesos.fetcherCache.enable This means that all Mesos tasks for Spark drivers have uninformative names of the form "Driver for (MainClass)" rather than the configured application name, and Spark drivers never cache files unless the caching setting is specified on dispatcher as well. Coincidentally, the spark.mesos.fetchCache.enable option was misnamed, as referenced in the linked JIRA. > MesosClusterScheduler reads options from dispatcher conf instead of > submission conf > --- > > Key: SPARK-26192 > URL: https://issues.apache.org/jira/browse/SPARK-26192 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > There is at least one option accessed in MesosClusterScheduler that should > come from the submission's configuration instead of the dispatcher's: > spark.mesos.fetcherCache.enable > Coincidentally, the spark.mesos.fetcherCache.enable option was previously > misnamed, as referenced in the linked JIRA. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27014) Support removal of jars and Spark binaries from Mesos driver and executor sandboxes
Martin Loncaric created SPARK-27014: --- Summary: Support removal of jars and Spark binaries from Mesos driver and executor sandboxes Key: SPARK-27014 URL: https://issues.apache.org/jira/browse/SPARK-27014 Project: Spark Issue Type: New Feature Components: Mesos Affects Versions: 2.4.0 Reporter: Martin Loncaric Currently, each Spark application run on Mesos leaves behind at least 500MB of data in sandbox directories, coming from Spark binaries and copied URIs. These can build up as a disk leak, causing major issues on Mesos clusters unless their grace period for sandbox directories is very short. Spark should have a feature to delete these (from both driver and executor sandboxes) on teardown. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27014) Support removal of jars and Spark binaries from Mesos driver and executor sandboxes
[ https://issues.apache.org/jira/browse/SPARK-27014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27014: Affects Version/s: (was: 2.4.0) 2.5.0 > Support removal of jars and Spark binaries from Mesos driver and executor > sandboxes > --- > > Key: SPARK-27014 > URL: https://issues.apache.org/jira/browse/SPARK-27014 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.5.0 >Reporter: Martin Loncaric >Priority: Major > > Currently, each Spark application run on Mesos leaves behind at least 500MB > of data in sandbox directories, coming from Spark binaries and copied URIs. > These can build up as a disk leak, causing major issues on Mesos clusters > unless their grace period for sandbox directories is very short. > Spark should have a feature to delete these (from both driver and executor > sandboxes) on teardown. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf
[ https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-26192: Fix Version/s: 3.0.0 2.4.1 2.3.4 > MesosClusterScheduler reads options from dispatcher conf instead of > submission conf > --- > > Key: SPARK-26192 > URL: https://issues.apache.org/jira/browse/SPARK-26192 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > Fix For: 2.3.4, 2.4.1, 3.0.0 > > > There is at least one option accessed in MesosClusterScheduler that should > come from the submission's configuration instead of the dispatcher's: > spark.mesos.fetcherCache.enable > Coincidentally, the spark.mesos.fetcherCache.enable option was previously > misnamed, as referenced in the linked JIRA. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
Martin Loncaric created SPARK-27015: --- Summary: spark-submit does not properly escape arguments sent to Mesos dispatcher Key: SPARK-27015 URL: https://issues.apache.org/jira/browse/SPARK-27015 Project: Spark Issue Type: New Feature Components: Mesos Affects Versions: 2.4.0, 2.3.3 Reporter: Martin Loncaric Arguments sent to the dispatcher must be escaped; for instance, ``` spark-submit --master mesos://url:port my.jar --arg1 "a b c" ``` fails, and instead must be submitted as ``` spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c" ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}} was: Arguments sent to the dispatcher must be escaped; for instance, `spark-submit --master mesos://url:port my.jar --arg1 "a b c"` fails, and instead must be submitted as `spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"` > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} > fails, and instead must be submitted as > {{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, `spark-submit --master mesos://url:port my.jar --arg1 "a b c"` fails, and instead must be submitted as `spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"` was: Arguments sent to the dispatcher must be escaped; for instance, ``` spark-submit --master mesos://url:port my.jar --arg1 "a b c" ``` fails, and instead must be submitted as ``` spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c" ``` > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > `spark-submit --master mesos://url:port my.jar --arg1 "a b c"` > fails, and instead must be submitted as > `spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"` -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27014) Support removal of jars and Spark binaries from Mesos driver and executor sandboxes
[ https://issues.apache.org/jira/browse/SPARK-27014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27014: Fix Version/s: 3.0.0 2.5.0 > Support removal of jars and Spark binaries from Mesos driver and executor > sandboxes > --- > > Key: SPARK-27014 > URL: https://issues.apache.org/jira/browse/SPARK-27014 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.5.0 >Reporter: Martin Loncaric >Priority: Major > Fix For: 2.5.0, 3.0.0 > > > Currently, each Spark application run on Mesos leaves behind at least 500MB > of data in sandbox directories, coming from Spark binaries and copied URIs. > These can build up as a disk leak, causing major issues on Mesos clusters > unless their grace period for sandbox directories is very short. > Spark should have a feature to delete these (from both driver and executor > sandboxes) on teardown. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}} was: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a\\\ b\\ c"}} > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} > fails, and instead must be submitted as > {{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c" was: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}} > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} > fails, and instead must be submitted as > spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c" -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a{\\} b\\ c"}} was: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a{nolink:} b\\ c"}} > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} > fails, and instead must be submitted as > {{spark-submit --master mesos://url:port my.jar --arg1 "a{\\} b\\ c"}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a\ b\\ c"}} was: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a{\\} b\\ c"}} > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} > fails, and instead must be submitted as > {{spark-submit --master mesos://url:port my.jar --arg1 "a\ b\\ c"}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a{noformat}\\{noformat} b\\ c"}} was: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a\ b\\ c"}} > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} > fails, and instead must be submitted as > {{spark-submit --master mesos://url:port my.jar --arg1 > "a{noformat}\\{noformat} b\\ c"}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"{noformat} was: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a{noformat}\\{noformat} b\\ c"}} > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} > fails, and instead must be submitted as > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ > c"{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a b$c"{noformat} fails, and instead must be submitted as {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\$c"{noformat} was: Arguments sent to the dispatcher must be escaped; for instance, {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a b c"{noformat} fails, and instead must be submitted as {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"{noformat} > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a > b$c"{noformat} > fails, and instead must be submitted as > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ > b\\$c"{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Issue Type: Bug (was: New Feature) > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a > b$c"{noformat} > fails, and instead must be submitted as > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ > b\\$c"{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a{nolink:} b\\ c"}} was: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c" > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} > fails, and instead must be submitted as > {{spark-submit --master mesos://url:port my.jar --arg1 "a{nolink:} b\\ > c"}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a b c"{noformat} fails, and instead must be submitted as {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"{noformat} was: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"{noformat} > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a b > c"{noformat} > fails, and instead must be submitted as > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ > c"{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Description: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a\\\ b\\ c"}} was: Arguments sent to the dispatcher must be escaped; for instance, {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} fails, and instead must be submitted as {{spark-submit --master mesos://url:port my.jar --arg1 "a\\ b\\ c"}} > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {{spark-submit --master mesos://url:port my.jar --arg1 "a b c"}} > fails, and instead must be submitted as > {{spark-submit --master mesos://url:port my.jar --arg1 "a\\\ b\\ c"}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782185#comment-16782185 ] Martin Loncaric commented on SPARK-26555: - Will try it out and report back > Thread safety issue causes createDataset to fail with misleading errors > --- > > Key: SPARK-26555 > URL: https://issues.apache.org/jira/browse/SPARK-26555 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > This can be replicated (~2% of the time) with > {code:scala} > import java.sql.Timestamp > import java.util.concurrent.{Executors, Future} > import org.apache.spark.sql.SparkSession > import scala.collection.mutable.ListBuffer > import scala.concurrent.ExecutionContext > import scala.util.Random > object Main { > def main(args: Array[String]): Unit = { > val sparkSession = SparkSession.builder > .getOrCreate() > import sparkSession.implicits._ > val executor = Executors.newFixedThreadPool(1) > try { > implicit val xc: ExecutionContext = > ExecutionContext.fromExecutorService(executor) > val futures = new ListBuffer[Future[_]]() > for (i <- 1 to 3) { > futures += executor.submit(new Runnable { > override def run(): Unit = { > val d = if (Random.nextInt(2) == 0) Some("d value") else None > val e = if (Random.nextInt(2) == 0) Some(5.0) else None > val f = if (Random.nextInt(2) == 0) Some(6.0) else None > println("DEBUG", d, e, f) > sparkSession.createDataset(Seq( > MyClass(new Timestamp(1L), "b", "c", d, e, f) > )) > } > }) > } > futures.foreach(_.get()) > } finally { > println("SHUTDOWN") > executor.shutdown() > sparkSession.stop() > } > } > case class MyClass( > a: Timestamp, > b: String, > c: String, > d: Option[String], > e: Option[Double], > f: Option[Double] > ) > } > {code} > So it will usually come up during > {code:bash} > for i in $(seq 1 200); do > echo $i > spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar > done > {code} > causing a variety of possible errors, such as > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} > or > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782239#comment-16782239 ] Martin Loncaric commented on SPARK-26555: - I was able to replicate with both all rows as `Some()` and all rows as `None`. > Thread safety issue causes createDataset to fail with misleading errors > --- > > Key: SPARK-26555 > URL: https://issues.apache.org/jira/browse/SPARK-26555 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > This can be replicated (~2% of the time) with > {code:scala} > import java.sql.Timestamp > import java.util.concurrent.{Executors, Future} > import org.apache.spark.sql.SparkSession > import scala.collection.mutable.ListBuffer > import scala.concurrent.ExecutionContext > import scala.util.Random > object Main { > def main(args: Array[String]): Unit = { > val sparkSession = SparkSession.builder > .getOrCreate() > import sparkSession.implicits._ > val executor = Executors.newFixedThreadPool(1) > try { > implicit val xc: ExecutionContext = > ExecutionContext.fromExecutorService(executor) > val futures = new ListBuffer[Future[_]]() > for (i <- 1 to 3) { > futures += executor.submit(new Runnable { > override def run(): Unit = { > val d = if (Random.nextInt(2) == 0) Some("d value") else None > val e = if (Random.nextInt(2) == 0) Some(5.0) else None > val f = if (Random.nextInt(2) == 0) Some(6.0) else None > println("DEBUG", d, e, f) > sparkSession.createDataset(Seq( > MyClass(new Timestamp(1L), "b", "c", d, e, f) > )) > } > }) > } > futures.foreach(_.get()) > } finally { > println("SHUTDOWN") > executor.shutdown() > sparkSession.stop() > } > } > case class MyClass( > a: Timestamp, > b: String, > c: String, > d: Option[String], > e: Option[Double], > f: Option[Double] > ) > } > {code} > So it will usually come up during > {code:bash} > for i in $(seq 1 200); do > echo $i > spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar > done > {code} > causing a variety of possible errors, such as > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} > or > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782239#comment-16782239 ] Martin Loncaric edited comment on SPARK-26555 at 3/2/19 1:25 AM: - I was able to replicate with both all rows in all optional columns as `Some()` and all rows in all optional columns as `None`. was (Author: mwlon): I was able to replicate with both all rows as `Some()` and all rows as `None`. > Thread safety issue causes createDataset to fail with misleading errors > --- > > Key: SPARK-26555 > URL: https://issues.apache.org/jira/browse/SPARK-26555 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > This can be replicated (~2% of the time) with > {code:scala} > import java.sql.Timestamp > import java.util.concurrent.{Executors, Future} > import org.apache.spark.sql.SparkSession > import scala.collection.mutable.ListBuffer > import scala.concurrent.ExecutionContext > import scala.util.Random > object Main { > def main(args: Array[String]): Unit = { > val sparkSession = SparkSession.builder > .getOrCreate() > import sparkSession.implicits._ > val executor = Executors.newFixedThreadPool(1) > try { > implicit val xc: ExecutionContext = > ExecutionContext.fromExecutorService(executor) > val futures = new ListBuffer[Future[_]]() > for (i <- 1 to 3) { > futures += executor.submit(new Runnable { > override def run(): Unit = { > val d = if (Random.nextInt(2) == 0) Some("d value") else None > val e = if (Random.nextInt(2) == 0) Some(5.0) else None > val f = if (Random.nextInt(2) == 0) Some(6.0) else None > println("DEBUG", d, e, f) > sparkSession.createDataset(Seq( > MyClass(new Timestamp(1L), "b", "c", d, e, f) > )) > } > }) > } > futures.foreach(_.get()) > } finally { > println("SHUTDOWN") > executor.shutdown() > sparkSession.stop() > } > } > case class MyClass( > a: Timestamp, > b: String, > c: String, > d: Option[String], > e: Option[Double], > f: Option[Double] > ) > } > {code} > So it will usually come up during > {code:bash} > for i in $(seq 1 200); do > echo $i > spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar > done > {code} > causing a variety of possible errors, such as > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} > or > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782258#comment-16782258 ] Martin Loncaric commented on SPARK-26555: - I can also replicate with different schemas containing Option. When I remove all Option columns from the schema, the sporadic failure goes away. This also never happens when I remove the concurrency. > Thread safety issue causes createDataset to fail with misleading errors > --- > > Key: SPARK-26555 > URL: https://issues.apache.org/jira/browse/SPARK-26555 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > This can be replicated (~2% of the time) with > {code:scala} > import java.sql.Timestamp > import java.util.concurrent.{Executors, Future} > import org.apache.spark.sql.SparkSession > import scala.collection.mutable.ListBuffer > import scala.concurrent.ExecutionContext > import scala.util.Random > object Main { > def main(args: Array[String]): Unit = { > val sparkSession = SparkSession.builder > .getOrCreate() > import sparkSession.implicits._ > val executor = Executors.newFixedThreadPool(1) > try { > implicit val xc: ExecutionContext = > ExecutionContext.fromExecutorService(executor) > val futures = new ListBuffer[Future[_]]() > for (i <- 1 to 3) { > futures += executor.submit(new Runnable { > override def run(): Unit = { > val d = if (Random.nextInt(2) == 0) Some("d value") else None > val e = if (Random.nextInt(2) == 0) Some(5.0) else None > val f = if (Random.nextInt(2) == 0) Some(6.0) else None > println("DEBUG", d, e, f) > sparkSession.createDataset(Seq( > MyClass(new Timestamp(1L), "b", "c", d, e, f) > )) > } > }) > } > futures.foreach(_.get()) > } finally { > println("SHUTDOWN") > executor.shutdown() > sparkSession.stop() > } > } > case class MyClass( > a: Timestamp, > b: String, > c: String, > d: Option[String], > e: Option[Double], > f: Option[Double] > ) > } > {code} > So it will usually come up during > {code:bash} > for i in $(seq 1 200); do > echo $i > spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar > done > {code} > causing a variety of possible errors, such as > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} > or > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782821#comment-16782821 ] Martin Loncaric edited comment on SPARK-26555 at 3/3/19 6:56 PM: - Yes - when I take away any randomness and use the same dataset every time (say, with Some(something) for each optional value), I still get this issue. I've run this code in a couple different environments and obtained the same result, so you should be able to verify this as well. was (Author: mwlon): Yes - when I take away any randomness and use the same dataset every time (say, with Some(something) for each optional value), I still get this issue. > Thread safety issue causes createDataset to fail with misleading errors > --- > > Key: SPARK-26555 > URL: https://issues.apache.org/jira/browse/SPARK-26555 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > This can be replicated (~2% of the time) with > {code:scala} > import java.sql.Timestamp > import java.util.concurrent.{Executors, Future} > import org.apache.spark.sql.SparkSession > import scala.collection.mutable.ListBuffer > import scala.concurrent.ExecutionContext > import scala.util.Random > object Main { > def main(args: Array[String]): Unit = { > val sparkSession = SparkSession.builder > .getOrCreate() > import sparkSession.implicits._ > val executor = Executors.newFixedThreadPool(1) > try { > implicit val xc: ExecutionContext = > ExecutionContext.fromExecutorService(executor) > val futures = new ListBuffer[Future[_]]() > for (i <- 1 to 3) { > futures += executor.submit(new Runnable { > override def run(): Unit = { > val d = if (Random.nextInt(2) == 0) Some("d value") else None > val e = if (Random.nextInt(2) == 0) Some(5.0) else None > val f = if (Random.nextInt(2) == 0) Some(6.0) else None > println("DEBUG", d, e, f) > sparkSession.createDataset(Seq( > MyClass(new Timestamp(1L), "b", "c", d, e, f) > )) > } > }) > } > futures.foreach(_.get()) > } finally { > println("SHUTDOWN") > executor.shutdown() > sparkSession.stop() > } > } > case class MyClass( > a: Timestamp, > b: String, > c: String, > d: Option[String], > e: Option[Double], > f: Option[Double] > ) > } > {code} > So it will usually come up during > {code:bash} > for i in $(seq 1 200); do > echo $i > spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar > done > {code} > causing a variety of possible errors, such as > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} > or > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782821#comment-16782821 ] Martin Loncaric commented on SPARK-26555: - Yes - when I take away any randomness and use the same dataset every time (say, with Some(something) for each optional value), I still get this issue. > Thread safety issue causes createDataset to fail with misleading errors > --- > > Key: SPARK-26555 > URL: https://issues.apache.org/jira/browse/SPARK-26555 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > This can be replicated (~2% of the time) with > {code:scala} > import java.sql.Timestamp > import java.util.concurrent.{Executors, Future} > import org.apache.spark.sql.SparkSession > import scala.collection.mutable.ListBuffer > import scala.concurrent.ExecutionContext > import scala.util.Random > object Main { > def main(args: Array[String]): Unit = { > val sparkSession = SparkSession.builder > .getOrCreate() > import sparkSession.implicits._ > val executor = Executors.newFixedThreadPool(1) > try { > implicit val xc: ExecutionContext = > ExecutionContext.fromExecutorService(executor) > val futures = new ListBuffer[Future[_]]() > for (i <- 1 to 3) { > futures += executor.submit(new Runnable { > override def run(): Unit = { > val d = if (Random.nextInt(2) == 0) Some("d value") else None > val e = if (Random.nextInt(2) == 0) Some(5.0) else None > val f = if (Random.nextInt(2) == 0) Some(6.0) else None > println("DEBUG", d, e, f) > sparkSession.createDataset(Seq( > MyClass(new Timestamp(1L), "b", "c", d, e, f) > )) > } > }) > } > futures.foreach(_.get()) > } finally { > println("SHUTDOWN") > executor.shutdown() > sparkSession.stop() > } > } > case class MyClass( > a: Timestamp, > b: String, > c: String, > d: Option[String], > e: Option[Double], > f: Option[Double] > ) > } > {code} > So it will usually come up during > {code:bash} > for i in $(seq 1 200); do > echo $i > spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar > done > {code} > causing a variety of possible errors, such as > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} > or > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Fix Version/s: 3.0.0 2.5.0 > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > Fix For: 2.5.0, 3.0.0 > > > Arguments sent to the dispatcher must be escaped; for instance, > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a > b$c"{noformat} > fails, and instead must be submitted as > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ > b\\$c"{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Affects Version/s: (was: 2.3.3) (was: 2.4.0) 3.0.0 2.5.0 > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.5.0, 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a > b$c"{noformat} > fails, and instead must be submitted as > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ > b\\$c"{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27015: Affects Version/s: (was: 2.5.0) (was: 3.0.0) 2.3.3 2.4.0 > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > Arguments sent to the dispatcher must be escaped; for instance, > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a > b$c"{noformat} > fails, and instead must be submitted as > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ > b\\$c"{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf
[ https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16783810#comment-16783810 ] Martin Loncaric edited comment on SPARK-26192 at 3/4/19 9:49 PM: - [~dongjoon] Thanks, I will pay more attention to those fields. However, I believe this is a bug. It violates behavior specified in https://spark.apache.org/docs/latest/running-on-mesos.html#configuration. Can we merge into 2.4.1 as well? was (Author: mwlon): [~dongjoon] Thanks, I will pay more attention to those fields. However, I believe this is a bug. It violates behavior specified in the https://spark.apache.org/docs/latest/running-on-mesos.html#configuration. Can we merge into 2.4.1 as well? > MesosClusterScheduler reads options from dispatcher conf instead of > submission conf > --- > > Key: SPARK-26192 > URL: https://issues.apache.org/jira/browse/SPARK-26192 > Project: Spark > Issue Type: Improvement > Components: Mesos >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0 >Reporter: Martin Loncaric >Assignee: Martin Loncaric >Priority: Minor > Fix For: 3.0.0 > > > There is at least one option accessed in MesosClusterScheduler that should > come from the submission's configuration instead of the dispatcher's: > spark.mesos.fetcherCache.enable > Coincidentally, the spark.mesos.fetcherCache.enable option was previously > misnamed, as referenced in the linked JIRA. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27014) Support removal of jars and Spark binaries from Mesos driver and executor sandboxes
[ https://issues.apache.org/jira/browse/SPARK-27014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16783811#comment-16783811 ] Martin Loncaric commented on SPARK-27014: - Sure, will keep that in mind. > Support removal of jars and Spark binaries from Mesos driver and executor > sandboxes > --- > > Key: SPARK-27014 > URL: https://issues.apache.org/jira/browse/SPARK-27014 > Project: Spark > Issue Type: New Feature > Components: Mesos >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Minor > > Currently, each Spark application run on Mesos leaves behind at least 500MB > of data in sandbox directories, coming from Spark binaries and copied URIs. > These can build up as a disk leak, causing major issues on Mesos clusters > unless their grace period for sandbox directories is very short. > Spark should have a feature to delete these (from both driver and executor > sandboxes) on teardown. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf
[ https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16783810#comment-16783810 ] Martin Loncaric edited comment on SPARK-26192 at 3/4/19 9:51 PM: - [~dongjoon] Thanks, I will pay more attention to those fields. However, I believe this is a bug. It violates behavior specified in https://spark.apache.org/docs/latest/running-on-mesos.html#configuration. Can we merge into at least 2.4.1 as well? was (Author: mwlon): [~dongjoon] Thanks, I will pay more attention to those fields. However, I believe this is a bug. It violates behavior specified in https://spark.apache.org/docs/latest/running-on-mesos.html#configuration. Can we merge into 2.4.1 as well? > MesosClusterScheduler reads options from dispatcher conf instead of > submission conf > --- > > Key: SPARK-26192 > URL: https://issues.apache.org/jira/browse/SPARK-26192 > Project: Spark > Issue Type: Improvement > Components: Mesos >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0 >Reporter: Martin Loncaric >Assignee: Martin Loncaric >Priority: Minor > Fix For: 3.0.0 > > > There is at least one option accessed in MesosClusterScheduler that should > come from the submission's configuration instead of the dispatcher's: > spark.mesos.fetcherCache.enable > Coincidentally, the spark.mesos.fetcherCache.enable option was previously > misnamed, as referenced in the linked JIRA. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf
[ https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16783810#comment-16783810 ] Martin Loncaric commented on SPARK-26192: - [~dongjoon] Thanks, I will pay more attention to those fields. However, I believe this is a bug. It violates behavior specified in the https://spark.apache.org/docs/latest/running-on-mesos.html#configuration. Can we merge into 2.4.1 as well? > MesosClusterScheduler reads options from dispatcher conf instead of > submission conf > --- > > Key: SPARK-26192 > URL: https://issues.apache.org/jira/browse/SPARK-26192 > Project: Spark > Issue Type: Improvement > Components: Mesos >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0 >Reporter: Martin Loncaric >Assignee: Martin Loncaric >Priority: Minor > Fix For: 3.0.0 > > > There is at least one option accessed in MesosClusterScheduler that should > come from the submission's configuration instead of the dispatcher's: > spark.mesos.fetcherCache.enable > Coincidentally, the spark.mesos.fetcherCache.enable option was previously > misnamed, as referenced in the linked JIRA. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27015) spark-submit does not properly escape arguments sent to Mesos dispatcher
[ https://issues.apache.org/jira/browse/SPARK-27015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16783944#comment-16783944 ] Martin Loncaric commented on SPARK-27015: - Created a PR: https://github.com/apache/spark/pull/23967 > spark-submit does not properly escape arguments sent to Mesos dispatcher > > > Key: SPARK-27015 > URL: https://issues.apache.org/jira/browse/SPARK-27015 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.3.3, 2.4.0 >Reporter: Martin Loncaric >Priority: Major > Fix For: 2.5.0, 3.0.0 > > > Arguments sent to the dispatcher must be escaped; for instance, > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a > b$c"{noformat} > fails, and instead must be submitted as > {noformat}spark-submit --master mesos://url:port my.jar --arg1 "a\\ > b\\$c"{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27098) Flaky missing file parts when writing to Ceph without error
Martin Loncaric created SPARK-27098: --- Summary: Flaky missing file parts when writing to Ceph without error Key: SPARK-27098 URL: https://issues.apache.org/jira/browse/SPARK-27098 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 2.4.0 Reporter: Martin Loncaric https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233 Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, occasionally a file part will be missing; i.e. part 3 here: ``` > aws s3 ls my-bucket/folder/ 2019-02-28 13:07:21 0 _SUCCESS 2019-02-28 13:06:58 79428651 part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:06:59 79586172 part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:00 79561910 part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:01 79192617 part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:07 79364413 part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:08 79623254 part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:10 79445030 part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:10 79474923 part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:11 79477310 part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:12 79331453 part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:13 79567600 part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:13 79388012 part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:14 79308387 part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:15 79455483 part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:17 79512342 part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:18 79403307 part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:18 79617769 part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:19 79333534 part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet 2019-02-28 13:07:20 79543324 part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet ``` However, the write succeeds and leaves a _SUCCESS file. This can be caught by additionally checking afterward whether the number of written file parts agrees with the number of partitions, but Spark should at least fail on its own and leave a meaningful stack trace in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788773#comment-16788773 ] Martin Loncaric commented on SPARK-26555: - You can literally try any dataset and replicate this issue. For example, sparkSession.createDataset(Seq( MyClass(new Timestamp(1L), "b", "c", Some("d"), Some(1.0), Some(2.0)) )) I think the code I left is pretty clear - it fails sometimes. Run it once, and it may or may not work. I don't run multiple spark-submit's in parallel. > Thread safety issue causes createDataset to fail with misleading errors > --- > > Key: SPARK-26555 > URL: https://issues.apache.org/jira/browse/SPARK-26555 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > This can be replicated (~2% of the time) with > {code:scala} > import java.sql.Timestamp > import java.util.concurrent.{Executors, Future} > import org.apache.spark.sql.SparkSession > import scala.collection.mutable.ListBuffer > import scala.concurrent.ExecutionContext > import scala.util.Random > object Main { > def main(args: Array[String]): Unit = { > val sparkSession = SparkSession.builder > .getOrCreate() > import sparkSession.implicits._ > val executor = Executors.newFixedThreadPool(1) > try { > implicit val xc: ExecutionContext = > ExecutionContext.fromExecutorService(executor) > val futures = new ListBuffer[Future[_]]() > for (i <- 1 to 3) { > futures += executor.submit(new Runnable { > override def run(): Unit = { > val d = if (Random.nextInt(2) == 0) Some("d value") else None > val e = if (Random.nextInt(2) == 0) Some(5.0) else None > val f = if (Random.nextInt(2) == 0) Some(6.0) else None > println("DEBUG", d, e, f) > sparkSession.createDataset(Seq( > MyClass(new Timestamp(1L), "b", "c", d, e, f) > )) > } > }) > } > futures.foreach(_.get()) > } finally { > println("SHUTDOWN") > executor.shutdown() > sparkSession.stop() > } > } > case class MyClass( > a: Timestamp, > b: String, > c: String, > d: Option[String], > e: Option[Double], > f: Option[Double] > ) > } > {code} > So it will usually come up during > {code:bash} > for i in $(seq 1 200); do > echo $i > spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar > done > {code} > causing a variety of possible errors, such as > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} > or > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788773#comment-16788773 ] Martin Loncaric edited comment on SPARK-26555 at 3/9/19 7:04 PM: - You can literally try any dataset with Option's in the schema and replicate this issue. For example, sparkSession.createDataset(Seq( MyClass(new Timestamp(1L), "b", "c", Some("d"), Some(1.0), Some(2.0)) )) I think the code I left is pretty clear - it fails sometimes. Run it once, and it may or may not work. I don't run multiple spark-submit's in parallel. was (Author: mwlon): You can literally try any dataset and replicate this issue. For example, sparkSession.createDataset(Seq( MyClass(new Timestamp(1L), "b", "c", Some("d"), Some(1.0), Some(2.0)) )) I think the code I left is pretty clear - it fails sometimes. Run it once, and it may or may not work. I don't run multiple spark-submit's in parallel. > Thread safety issue causes createDataset to fail with misleading errors > --- > > Key: SPARK-26555 > URL: https://issues.apache.org/jira/browse/SPARK-26555 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > This can be replicated (~2% of the time) with > {code:scala} > import java.sql.Timestamp > import java.util.concurrent.{Executors, Future} > import org.apache.spark.sql.SparkSession > import scala.collection.mutable.ListBuffer > import scala.concurrent.ExecutionContext > import scala.util.Random > object Main { > def main(args: Array[String]): Unit = { > val sparkSession = SparkSession.builder > .getOrCreate() > import sparkSession.implicits._ > val executor = Executors.newFixedThreadPool(1) > try { > implicit val xc: ExecutionContext = > ExecutionContext.fromExecutorService(executor) > val futures = new ListBuffer[Future[_]]() > for (i <- 1 to 3) { > futures += executor.submit(new Runnable { > override def run(): Unit = { > val d = if (Random.nextInt(2) == 0) Some("d value") else None > val e = if (Random.nextInt(2) == 0) Some(5.0) else None > val f = if (Random.nextInt(2) == 0) Some(6.0) else None > println("DEBUG", d, e, f) > sparkSession.createDataset(Seq( > MyClass(new Timestamp(1L), "b", "c", d, e, f) > )) > } > }) > } > futures.foreach(_.get()) > } finally { > println("SHUTDOWN") > executor.shutdown() > sparkSession.stop() > } > } > case class MyClass( > a: Timestamp, > b: String, > c: String, > d: Option[String], > e: Option[Double], > f: Option[Double] > ) > } > {code} > So it will usually come up during > {code:bash} > for i in $(seq 1 200); do > echo $i > spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar > done > {code} > causing a variety of possible errors, such as > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} > or > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16792086#comment-16792086 ] Martin Loncaric commented on SPARK-26555: - Update: I have been able to replicate this without Spark at all, using snippets from org.apache.spark.sql.catalyst.ScalaReflection: https://stackoverflow.com/questions/55150590/thread-safety-in-scala-reflection-with-type-matching Investigating whether this can be fixed with different usage of the reflection library, or whether this is a scala issue. > Thread safety issue causes createDataset to fail with misleading errors > --- > > Key: SPARK-26555 > URL: https://issues.apache.org/jira/browse/SPARK-26555 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > This can be replicated (~2% of the time) with > {code:scala} > import java.sql.Timestamp > import java.util.concurrent.{Executors, Future} > import org.apache.spark.sql.SparkSession > import scala.collection.mutable.ListBuffer > import scala.concurrent.ExecutionContext > import scala.util.Random > object Main { > def main(args: Array[String]): Unit = { > val sparkSession = SparkSession.builder > .getOrCreate() > import sparkSession.implicits._ > val executor = Executors.newFixedThreadPool(1) > try { > implicit val xc: ExecutionContext = > ExecutionContext.fromExecutorService(executor) > val futures = new ListBuffer[Future[_]]() > for (i <- 1 to 3) { > futures += executor.submit(new Runnable { > override def run(): Unit = { > val d = if (Random.nextInt(2) == 0) Some("d value") else None > val e = if (Random.nextInt(2) == 0) Some(5.0) else None > val f = if (Random.nextInt(2) == 0) Some(6.0) else None > println("DEBUG", d, e, f) > sparkSession.createDataset(Seq( > MyClass(new Timestamp(1L), "b", "c", d, e, f) > )) > } > }) > } > futures.foreach(_.get()) > } finally { > println("SHUTDOWN") > executor.shutdown() > sparkSession.stop() > } > } > case class MyClass( > a: Timestamp, > b: String, > c: String, > d: Option[String], > e: Option[Double], > f: Option[Double] > ) > } > {code} > So it will usually come up during > {code:bash} > for i in $(seq 1 200); do > echo $i > spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar > done > {code} > causing a variety of possible errors, such as > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} > or > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16792086#comment-16792086 ] Martin Loncaric edited comment on SPARK-26555 at 3/13/19 8:26 PM: -- Update: I have proved that the issue lies in reflection thread safety issues in org.apache.spark.sql.catalyst.ScalaReflection: https://stackoverflow.com/questions/55150590/thread-safety-in-scala-reflection-with-type-matching Investigating whether this can be fixed with different usage of the reflection library, or whether this is a scala issue. was (Author: mwlon): Update: I have been able to replicate this without Spark at all, using snippets from org.apache.spark.sql.catalyst.ScalaReflection: https://stackoverflow.com/questions/55150590/thread-safety-in-scala-reflection-with-type-matching Investigating whether this can be fixed with different usage of the reflection library, or whether this is a scala issue. > Thread safety issue causes createDataset to fail with misleading errors > --- > > Key: SPARK-26555 > URL: https://issues.apache.org/jira/browse/SPARK-26555 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > This can be replicated (~2% of the time) with > {code:scala} > import java.sql.Timestamp > import java.util.concurrent.{Executors, Future} > import org.apache.spark.sql.SparkSession > import scala.collection.mutable.ListBuffer > import scala.concurrent.ExecutionContext > import scala.util.Random > object Main { > def main(args: Array[String]): Unit = { > val sparkSession = SparkSession.builder > .getOrCreate() > import sparkSession.implicits._ > val executor = Executors.newFixedThreadPool(1) > try { > implicit val xc: ExecutionContext = > ExecutionContext.fromExecutorService(executor) > val futures = new ListBuffer[Future[_]]() > for (i <- 1 to 3) { > futures += executor.submit(new Runnable { > override def run(): Unit = { > val d = if (Random.nextInt(2) == 0) Some("d value") else None > val e = if (Random.nextInt(2) == 0) Some(5.0) else None > val f = if (Random.nextInt(2) == 0) Some(6.0) else None > println("DEBUG", d, e, f) > sparkSession.createDataset(Seq( > MyClass(new Timestamp(1L), "b", "c", d, e, f) > )) > } > }) > } > futures.foreach(_.get()) > } finally { > println("SHUTDOWN") > executor.shutdown() > sparkSession.stop() > } > } > case class MyClass( > a: Timestamp, > b: String, > c: String, > d: Option[String], > e: Option[Double], > f: Option[Double] > ) > } > {code} > So it will usually come up during > {code:bash} > for i in $(seq 1 200); do > echo $i > spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar > done > {code} > causing a variety of possible errors, such as > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} > or > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16792240#comment-16792240 ] Martin Loncaric commented on SPARK-26555: - This is an existing issue with scala: https://github.com/scala/bug/issues/10766 > Thread safety issue causes createDataset to fail with misleading errors > --- > > Key: SPARK-26555 > URL: https://issues.apache.org/jira/browse/SPARK-26555 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > > This can be replicated (~2% of the time) with > {code:scala} > import java.sql.Timestamp > import java.util.concurrent.{Executors, Future} > import org.apache.spark.sql.SparkSession > import scala.collection.mutable.ListBuffer > import scala.concurrent.ExecutionContext > import scala.util.Random > object Main { > def main(args: Array[String]): Unit = { > val sparkSession = SparkSession.builder > .getOrCreate() > import sparkSession.implicits._ > val executor = Executors.newFixedThreadPool(1) > try { > implicit val xc: ExecutionContext = > ExecutionContext.fromExecutorService(executor) > val futures = new ListBuffer[Future[_]]() > for (i <- 1 to 3) { > futures += executor.submit(new Runnable { > override def run(): Unit = { > val d = if (Random.nextInt(2) == 0) Some("d value") else None > val e = if (Random.nextInt(2) == 0) Some(5.0) else None > val f = if (Random.nextInt(2) == 0) Some(6.0) else None > println("DEBUG", d, e, f) > sparkSession.createDataset(Seq( > MyClass(new Timestamp(1L), "b", "c", d, e, f) > )) > } > }) > } > futures.foreach(_.get()) > } finally { > println("SHUTDOWN") > executor.shutdown() > sparkSession.stop() > } > } > case class MyClass( > a: Timestamp, > b: String, > c: String, > d: Option[String], > e: Option[Double], > f: Option[Double] > ) > } > {code} > So it will usually come up during > {code:bash} > for i in $(seq 1 200); do > echo $i > spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar > done > {code} > causing a variety of possible errors, such as > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: scala.MatchError: scala.Option[String] (of class > scala.reflect.internal.Types$ClassArgsTypeRef) > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} > or > {code}Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > Caused by: java.lang.UnsupportedOperationException: Schema for type > scala.Option[scala.Double] is not supported > at > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27098) Flaky missing file parts when writing to Ceph without error
[ https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-27098: Attachment: sanitized_stdout_1.txt > Flaky missing file parts when writing to Ceph without error > --- > > Key: SPARK-27098 > URL: https://issues.apache.org/jira/browse/SPARK-27098 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > Attachments: sanitized_stdout_1.txt > > > https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233 > Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. > occasionally a file part will be missing; i.e. part 3 here: > ``` > > aws s3 ls my-bucket/folder/ > 2019-02-28 13:07:21 0 _SUCCESS > 2019-02-28 13:06:58 79428651 > part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:06:59 79586172 > part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:00 79561910 > part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:01 79192617 > part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:07 79364413 > part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:08 79623254 > part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79445030 > part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79474923 > part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:11 79477310 > part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:12 79331453 > part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79567600 > part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79388012 > part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:14 79308387 > part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:15 79455483 > part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:17 79512342 > part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79403307 > part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79617769 > part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:19 79333534 > part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:20 79543324 > part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > ``` > However, the write succeeds and leaves a _SUCCESS file. > This can be caught by additionally checking afterward whether the number of > written file parts agrees with the number of partitions, but Spark should at > least fail on its own and leave a meaningful stack trace in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27098) Flaky missing file parts when writing to Ceph without error
[ https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16793080#comment-16793080 ] Martin Loncaric commented on SPARK-27098: - I've gotten the debug logs for (1.), but can't make much of them. In this case, `part-0-` was missing: {{Exception in thread "main" java.lang.AssertionError: assertion failed: Expected to write dataframe with 20 partitions in s3a://my-bucket/my_folder but instead found 19 written parts! 1552587026347 82681618 part-1-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587027399 82631123 part-2-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587028592 82513038 part-3-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587029544 82325322 part-4-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587030573 82497917 part-5-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587031590 82736624 part-6-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587032449 82573267 part-7-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587033351 82590538 part-8-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587034582 82617979 part-9-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587035817 82430474 part-00010-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587036808 82688230 part-00011-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587037744 8252 part-00012-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587039017 82434976 part-00013-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587039919 82535772 part-00014-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587040884 82612890 part-00015-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587041898 82535110 part-00016-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587042829 82735449 part-00017-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587043744 82460648 part-00018-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet 1552587044641 82658185 part-00019-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet at scala.Predef$.assert(Predef.scala:170)}} Looking at stdout for the driver, I find that there is absolutely no mention of part-0, but the other parts (i.e. part-1) have various logs, including the "rename path" ones you mentioned, like so: {{2019-03-14 18:10:26 DEBUG S3AFileSystem:449 - Rename path s3a://my-bucket/my/folder/_temporary/0/task_20190314180906_0016_m_01/part-1-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet to s3a://my-bucket/my/folder/part-1-5e21727b-508e-4246-b47c-c68c98c04f50-c000.snappy.parquet}} I have attached all the debugging related to part-1 here. As mentioned, there is nothing for the missing part-0 (in other runs, it was a different part missing, so there is nothing special about 0, just coincidence). [^sanitized_stdout_1.txt] > Flaky missing file parts when writing to Ceph without error > --- > > Key: SPARK-27098 > URL: https://issues.apache.org/jira/browse/SPARK-27098 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > Attachments: sanitized_stdout_1.txt > > > https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233 > Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. > occasionally a file part will be missing; i.e. part 3 here: > ``` > > aws s3 ls my-bucket/folder/ > 2019-02-28 13:07:21 0 _SUCCESS > 2019-02-28 13:06:58 79428651 > part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:06:59 79586172 > part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:00 79561910 > part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:01 79192617 > part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:07 79364413 > part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:08 79623254 > part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79445030 > part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79474923 > part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:11 79477310 > part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:12 79331453 > part-00010-5789
[jira] [Commented] (SPARK-27098) Flaky missing file parts when writing to Ceph without error
[ https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16793082#comment-16793082 ] Martin Loncaric commented on SPARK-27098: - [~ste...@apache.org] Does this make more sense to you? This seems to suggest a bug in either Spark or Hadoop, but do you have a better idea of where to look? > Flaky missing file parts when writing to Ceph without error > --- > > Key: SPARK-27098 > URL: https://issues.apache.org/jira/browse/SPARK-27098 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > Attachments: sanitized_stdout_1.txt > > > https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233 > Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. > occasionally a file part will be missing; i.e. part 3 here: > ``` > > aws s3 ls my-bucket/folder/ > 2019-02-28 13:07:21 0 _SUCCESS > 2019-02-28 13:06:58 79428651 > part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:06:59 79586172 > part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:00 79561910 > part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:01 79192617 > part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:07 79364413 > part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:08 79623254 > part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79445030 > part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79474923 > part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:11 79477310 > part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:12 79331453 > part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79567600 > part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79388012 > part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:14 79308387 > part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:15 79455483 > part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:17 79512342 > part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79403307 > part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79617769 > part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:19 79333534 > part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:20 79543324 > part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > ``` > However, the write succeeds and leaves a _SUCCESS file. > This can be caught by additionally checking afterward whether the number of > written file parts agrees with the number of partitions, but Spark should at > least fail on its own and leave a meaningful stack trace in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-27098) Flaky missing file parts when writing to Ceph without error
[ https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16793082#comment-16793082 ] Martin Loncaric edited comment on SPARK-27098 at 3/14/19 9:28 PM: -- [~ste...@apache.org] Does this make more sense to you? This seems to suggest a bug in either Spark or Hadoop, but do you have a more specific idea of where to look? was (Author: mwlon): [~ste...@apache.org] Does this make more sense to you? This seems to suggest a bug in either Spark or Hadoop, but do you have a better idea of where to look? > Flaky missing file parts when writing to Ceph without error > --- > > Key: SPARK-27098 > URL: https://issues.apache.org/jira/browse/SPARK-27098 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > Attachments: sanitized_stdout_1.txt > > > https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233 > Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. > occasionally a file part will be missing; i.e. part 3 here: > ``` > > aws s3 ls my-bucket/folder/ > 2019-02-28 13:07:21 0 _SUCCESS > 2019-02-28 13:06:58 79428651 > part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:06:59 79586172 > part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:00 79561910 > part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:01 79192617 > part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:07 79364413 > part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:08 79623254 > part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79445030 > part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79474923 > part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:11 79477310 > part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:12 79331453 > part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79567600 > part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79388012 > part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:14 79308387 > part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:15 79455483 > part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:17 79512342 > part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79403307 > part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79617769 > part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:19 79333534 > part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:20 79543324 > part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > ``` > However, the write succeeds and leaves a _SUCCESS file. > This can be caught by additionally checking afterward whether the number of > written file parts agrees with the number of partitions, but Spark should at > least fail on its own and leave a meaningful stack trace in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27098) Flaky missing file parts when writing to Ceph without error
[ https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16804294#comment-16804294 ] Martin Loncaric commented on SPARK-27098: - An INFO of this format also appears for each missing file: {{19/03/28 16:07:24 INFO SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_20190328160411_0004_m_000113_343}} > Flaky missing file parts when writing to Ceph without error > --- > > Key: SPARK-27098 > URL: https://issues.apache.org/jira/browse/SPARK-27098 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > Attachments: sanitized_stdout_1.txt > > > https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233 > Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. > occasionally a file part will be missing; i.e. part 3 here: > ``` > > aws s3 ls my-bucket/folder/ > 2019-02-28 13:07:21 0 _SUCCESS > 2019-02-28 13:06:58 79428651 > part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:06:59 79586172 > part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:00 79561910 > part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:01 79192617 > part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:07 79364413 > part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:08 79623254 > part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79445030 > part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79474923 > part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:11 79477310 > part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:12 79331453 > part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79567600 > part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79388012 > part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:14 79308387 > part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:15 79455483 > part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:17 79512342 > part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79403307 > part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79617769 > part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:19 79333534 > part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:20 79543324 > part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > ``` > However, the write succeeds and leaves a _SUCCESS file. > This can be caught by additionally checking afterward whether the number of > written file parts agrees with the number of partitions, but Spark should at > least fail on its own and leave a meaningful stack trace in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-27098) Flaky missing file parts when writing to Ceph without error
[ https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16804294#comment-16804294 ] Martin Loncaric edited comment on SPARK-27098 at 3/28/19 8:53 PM: -- An INFO of this format also appears for each missing file: {{19/03/28 16:07:24 INFO SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_20190328160411_0004_m_000113_343}} was (Author: mwlon): An INFO of this format also appears for each missing file: {{19/03/28 16:07:24 INFO SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_20190328160411_0004_m_000113_343}} > Flaky missing file parts when writing to Ceph without error > --- > > Key: SPARK-27098 > URL: https://issues.apache.org/jira/browse/SPARK-27098 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > Attachments: sanitized_stdout_1.txt > > > https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233 > Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. > occasionally a file part will be missing; i.e. part 3 here: > ``` > > aws s3 ls my-bucket/folder/ > 2019-02-28 13:07:21 0 _SUCCESS > 2019-02-28 13:06:58 79428651 > part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:06:59 79586172 > part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:00 79561910 > part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:01 79192617 > part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:07 79364413 > part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:08 79623254 > part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79445030 > part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79474923 > part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:11 79477310 > part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:12 79331453 > part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79567600 > part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79388012 > part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:14 79308387 > part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:15 79455483 > part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:17 79512342 > part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79403307 > part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79617769 > part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:19 79333534 > part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:20 79543324 > part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > ``` > However, the write succeeds and leaves a _SUCCESS file. > This can be caught by additionally checking afterward whether the number of > written file parts agrees with the number of partitions, but Spark should at > least fail on its own and leave a meaningful stack trace in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26082) Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler
Martin Loncaric created SPARK-26082: --- Summary: Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler Key: SPARK-26082 URL: https://issues.apache.org/jira/browse/SPARK-26082 Project: Spark Issue Type: Bug Components: Mesos Affects Versions: 2.3.2, 2.3.1, 2.3.0, 2.2.2, 2.2.1, 2.2.0, 2.1.3, 2.1.2, 2.1.1, 2.1.0, 2.0.2, 2.0.1, 2.0.0 Reporter: Martin Loncaric Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]: {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos Fetcher Cache {quote} Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver): {{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)}} Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos caching parameter to executors): {{private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)}} This naming discrepancy dates back to version 2.0.0 ([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]). This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the Mesos cache will be used only for executors, and not for drivers. IMPACT: Not caching these driver files (typically including at least spark binaries, custom jar, and additional dependencies) adds considerable network traffic when frequently running spark Applications on a Mesos cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are additionally copied and left in the sandbox with the cache off (rather than extracted directly without an extra copy), this can considerably increase disk usage. Users CAN currently workaround by specifying the {{spark.mesos.fetchCache.enable}} option, but this should at least be specified in the documentation. SUGGESTED FIX: Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26082) Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler
[ https://issues.apache.org/jira/browse/SPARK-26082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-26082: Description: Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]: {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos Fetcher Cache {quote} Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver): {{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)}} Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos caching parameter to executors): {{private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)}} This naming discrepancy dates back to version 2.0.0 ([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]). This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the Mesos cache will be used only for executors, and not for drivers. IMPACT: Not caching these driver files (typically including at least spark binaries, custom jar, and additional dependencies) adds considerable overhead network traffic and startup time when frequently running spark Applications on a Mesos cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are additionally copied and left in the sandbox with the cache off (rather than extracted directly without an extra copy), this can considerably increase disk usage. Users CAN currently workaround by specifying the {{spark.mesos.fetchCache.enable}} option, but this should at least be specified in the documentation. SUGGESTED FIX: Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward. was: Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]: {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos Fetcher Cache {quote} Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver): {{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)}} Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos caching parameter to executors): {{private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)}} This naming discrepancy dates back to version 2.0.0 ([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]). This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the Mesos cache will be used only for executors, and not for drivers. IMPACT: Not caching these driver files (typically including at least spark binaries, custom jar, and additional dependencies) adds considerable network traffic when frequently running spark Applications on a Mesos cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are additionally copied and left in the sandbox with the cache off (rather than extracted directly without an extra copy), this can considerably increase disk usage. Users CAN currently workaround by specifying the {{spark.mesos.fetchCache.enable}} option, but this should at least be specified in the documentation. SUGGESTED FIX: Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward. > Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler > --- > > Key: SPARK-26082 > URL: https://issues.apache.org/jira/browse/SPARK-26082 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.1.3, 2.2.0, > 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.3.2 >Reporter: Martin Loncaric >Priority: Major > > Currently in > [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]: > {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs > (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the > Mesos Fetcher Cache > {quote} > Currently in {{MesosClusterScheduler.scala}} (which passes parameter to > driver): > {{private val useFetchCache = > conf.getBoolean("spark.mesos.fetchCache.enable", false)}} > Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos > caching parameter to executors): > {{private val useFetcherCache = > conf.getBoolean("spark.mesos.fetcherCache.enable", false)}} > This naming discrepanc
[jira] [Updated] (SPARK-26082) Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler
[ https://issues.apache.org/jira/browse/SPARK-26082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-26082: Description: Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]: {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos Fetcher Cache {quote} Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver): {{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)}} Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos caching parameter to executors): {{private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)}} This naming discrepancy dates back to version 2.0.0 ([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]). This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the Mesos cache will be used only for executors, and not for drivers. IMPACT: Not caching these driver files (typically including at least spark binaries, custom jar, and additional dependencies) adds considerable overhead network traffic and startup time when frequently running spark Applications on a Mesos cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are additionally copied and left in the sandbox with the cache off (rather than extracted directly without an extra copy), this can considerably increase disk usage. Users CAN currently workaround by specifying the {{spark.mesos.fetchCache.enable}} option, but this should at least be specified in the documentation. SUGGESTED FIX: Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward (literally a one-line change). was: Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]: {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos Fetcher Cache {quote} Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver): {{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)}} Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos caching parameter to executors): {{private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)}} This naming discrepancy dates back to version 2.0.0 ([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]). This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the Mesos cache will be used only for executors, and not for drivers. IMPACT: Not caching these driver files (typically including at least spark binaries, custom jar, and additional dependencies) adds considerable overhead network traffic and startup time when frequently running spark Applications on a Mesos cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are additionally copied and left in the sandbox with the cache off (rather than extracted directly without an extra copy), this can considerably increase disk usage. Users CAN currently workaround by specifying the {{spark.mesos.fetchCache.enable}} option, but this should at least be specified in the documentation. SUGGESTED FIX: Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward. > Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler > --- > > Key: SPARK-26082 > URL: https://issues.apache.org/jira/browse/SPARK-26082 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.1.3, 2.2.0, > 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.3.2 >Reporter: Martin Loncaric >Priority: Major > > Currently in > [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]: > {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs > (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the > Mesos Fetcher Cache > {quote} > Currently in {{MesosClusterScheduler.scala}} (which passes parameter to > driver): > {{private val useFetchCache = > conf.getBoolean("spark.mesos.fetchCache.enable", false)}} > Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos > caching parameter to executors): > {{private val useFetcherCache = > conf.getBoolean("spark.mes
[jira] [Updated] (SPARK-26082) Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler
[ https://issues.apache.org/jira/browse/SPARK-26082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-26082: Description: Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]: {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos Fetcher Cache {quote} Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver): {{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)}} Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos caching parameter to executors): {{private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)}} This naming discrepancy dates back to version 2.0.0 ([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]). This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the Mesos cache will be used only for executors, and not for drivers. IMPACT: Not caching these driver files (typically including at least spark binaries, custom jar, and additional dependencies) adds considerable overhead network traffic and startup time when frequently running spark Applications on a Mesos cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are additionally copied and left in the sandbox with the cache off (rather than extracted directly without an extra copy), this can considerably increase disk usage. Users CAN currently workaround by specifying the {{spark.mesos.fetchCache.enable}} option, but this should at least be specified in the documentation. SUGGESTED FIX: Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 2.4, and update {{MesosClusterScheduler.scala}} to use {{spark.mesos.fetcherCache.enable}} going forward (literally a one-line change). was: Currently in [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]: {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos Fetcher Cache {quote} Currently in {{MesosClusterScheduler.scala}} (which passes parameter to driver): {{private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false)}} Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos caching parameter to executors): {{private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)}} This naming discrepancy dates back to version 2.0.0 ([jira|http://mail-archives.apache.org/mod_mbox/spark-issues/201606.mbox/%3cjira.12979909.1466099309000.9921.1466101026...@atlassian.jira%3E]). This means that when {{spark.mesos.fetcherCache.enable=true}} is specified, the Mesos cache will be used only for executors, and not for drivers. IMPACT: Not caching these driver files (typically including at least spark binaries, custom jar, and additional dependencies) adds considerable overhead network traffic and startup time when frequently running spark Applications on a Mesos cluster. Additionally, since extracted files like {{spark-x.x.x-bin-*.tgz}} are additionally copied and left in the sandbox with the cache off (rather than extracted directly without an extra copy), this can considerably increase disk usage. Users CAN currently workaround by specifying the {{spark.mesos.fetchCache.enable}} option, but this should at least be specified in the documentation. SUGGESTED FIX: Add {{spark.mesos.fetchCache.enable}} to the documentation for versions 2 - 2.4, and update to {{spark.mesos.fetcherCache.enable}} going forward (literally a one-line change). > Misnaming of spark.mesos.fetch(er)Cache.enable in MesosClusterScheduler > --- > > Key: SPARK-26082 > URL: https://issues.apache.org/jira/browse/SPARK-26082 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.1.3, 2.2.0, > 2.2.1, 2.2.2, 2.3.0, 2.3.1, 2.3.2 >Reporter: Martin Loncaric >Priority: Major > > Currently in > [docs|https://spark.apache.org/docs/latest/running-on-mesos.html]: > {quote}spark.mesos.fetcherCache.enable / false / If set to `true`, all URIs > (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the > Mesos Fetcher Cache > {quote} > Currently in {{MesosClusterScheduler.scala}} (which passes parameter to > driver): > {{private val useFetchCache = > conf.getBoolean("spark.mesos.fetchCache.enable", false)}} > Currently in {{MesosCourseGrainedSchedulerBackend.scala}} (which passes mesos > caching parameter to executor
[jira] [Created] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf
Martin Loncaric created SPARK-26192: --- Summary: MesosClusterScheduler reads options from dispatcher conf instead of submission conf Key: SPARK-26192 URL: https://issues.apache.org/jira/browse/SPARK-26192 Project: Spark Issue Type: Bug Components: Mesos Affects Versions: 2.4.0, 2.3.2, 2.3.1, 2.3.0 Reporter: Martin Loncaric There are at least two options accessed in MesosClusterScheduler that should come from the submission's configuration instead of the dispatcher's: spark.app.name spark.mesos.fetchCache.enable This means that all Mesos tasks for Spark drivers have uninformative names of the form "Driver for (MainClass)" rather than the configured application name, and Spark drivers never cache files. Coincidentally, the spark.mesos.fetchCache.enable option is misnamed, as referenced in the linked JIRA. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-26555: Description: This can be replicated (~2% of the time) with {code:scala} import java.sql.Timestamp import java.util.concurrent.{Executors, Future} import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext import scala.util.Random object Main { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder .getOrCreate() import sparkSession.implicits._ val executor = Executors.newFixedThreadPool(1) try { implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(executor) val futures = new ListBuffer[Future[_]]() for (i <- 1 to 3) { futures += executor.submit(new Runnable { override def run(): Unit = { val d = if (Random.nextInt(2) == 0) Some("d value") else None val e = if (Random.nextInt(2) == 0) Some(5.0) else None val f = if (Random.nextInt(2) == 0) Some(6.0) else None println("DEBUG", d, e, f) sparkSession.createDataset(Seq( MyClass(new Timestamp(1L), "b", "c", d, e, f) )) } }) } futures.foreach(_.get()) } finally { println("SHUTDOWN") executor.shutdown() sparkSession.stop() } } case class MyClass( a: Timestamp, b: String, c: String, d: Option[String], e: Option[Double], f: Option[Double] ) } {code} causing a variety of possible errors, such as {code}Exception in thread "main" java.util.concurrent.ExecutionException: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} or {code}Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} was: This can be replicated (~2% of the time) with {code:scala} import java.sql.Timestamp import java.util.concurrent.{Executors, Future} import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext import scala.util.Random object Main { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder .getOrCreate() import sparkSession.implicits._ val executor = Executors.newFixedThreadPool(1) try { implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(executor) val futures = new ListBuffer[Future[_]]() for (i <- 1 to 3) { futures += executor.submit(new Runnable { override def run(): Unit = { val d = if (Random.nextInt(2) == 0) Some("d value") else None val e = if (Random.nextInt(2) == 0) Some(5.0) else None val f = if (Random.nextInt(2) == 0) Some(6.0) else None println("DEBUG", d, e, f) sparkSession.createDataset(Seq( MyClass(new Timestamp(1L), "b", "c", d, e, f) )) } }) } futures.foreach(_.get()) } finally { println("SHUTDOWN") executor.shutdown() sparkSession.stop() } } case class MyClass( a: Timestamp, b: String, c: String, d: Option[String], e: Option[Double], f: Option[Double] ) } {code} causing a variety of possible errors, such as {{Exception in thread "main" java.util.concurrent.ExecutionException: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210)}} or {{Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: j
[jira] [Updated] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-26555: Description: This can be replicated (~2% of the time) with {code:scala} import java.sql.Timestamp import java.util.concurrent.{Executors, Future} import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext import scala.util.Random object Main { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder .getOrCreate() import sparkSession.implicits._ val executor = Executors.newFixedThreadPool(1) try { implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(executor) val futures = new ListBuffer[Future[_]]() for (i <- 1 to 3) { futures += executor.submit(new Runnable { override def run(): Unit = { val d = if (Random.nextInt(2) == 0) Some("d value") else None val e = if (Random.nextInt(2) == 0) Some(5.0) else None val f = if (Random.nextInt(2) == 0) Some(6.0) else None println("DEBUG", d, e, f) sparkSession.createDataset(Seq( MyClass(new Timestamp(1L), "b", "c", d, e, f) )) } }) } futures.foreach(_.get()) } finally { println("SHUTDOWN") executor.shutdown() sparkSession.stop() } } case class MyClass( a: Timestamp, b: String, c: String, d: Option[String], e: Option[Double], f: Option[Double] ) } {code} {code:bash} for i in $(seq 1 200); do echo $i spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar done {code} causing a variety of possible errors, such as {code}Exception in thread "main" java.util.concurrent.ExecutionException: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} or {code}Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} was: This can be replicated (~2% of the time) with {code:scala} import java.sql.Timestamp import java.util.concurrent.{Executors, Future} import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext import scala.util.Random object Main { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder .getOrCreate() import sparkSession.implicits._ val executor = Executors.newFixedThreadPool(1) try { implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(executor) val futures = new ListBuffer[Future[_]]() for (i <- 1 to 3) { futures += executor.submit(new Runnable { override def run(): Unit = { val d = if (Random.nextInt(2) == 0) Some("d value") else None val e = if (Random.nextInt(2) == 0) Some(5.0) else None val f = if (Random.nextInt(2) == 0) Some(6.0) else None println("DEBUG", d, e, f) sparkSession.createDataset(Seq( MyClass(new Timestamp(1L), "b", "c", d, e, f) )) } }) } futures.foreach(_.get()) } finally { println("SHUTDOWN") executor.shutdown() sparkSession.stop() } } case class MyClass( a: Timestamp, b: String, c: String, d: Option[String], e: Option[Double], f: Option[Double] ) } {code} causing a variety of possible errors, such as {code}Exception in thread "main" java.util.concurrent.ExecutionException: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} or {code}Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationExcepti
[jira] [Updated] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
[ https://issues.apache.org/jira/browse/SPARK-26555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-26555: Description: This can be replicated (~2% of the time) with {code:scala} import java.sql.Timestamp import java.util.concurrent.{Executors, Future} import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext import scala.util.Random object Main { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder .getOrCreate() import sparkSession.implicits._ val executor = Executors.newFixedThreadPool(1) try { implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(executor) val futures = new ListBuffer[Future[_]]() for (i <- 1 to 3) { futures += executor.submit(new Runnable { override def run(): Unit = { val d = if (Random.nextInt(2) == 0) Some("d value") else None val e = if (Random.nextInt(2) == 0) Some(5.0) else None val f = if (Random.nextInt(2) == 0) Some(6.0) else None println("DEBUG", d, e, f) sparkSession.createDataset(Seq( MyClass(new Timestamp(1L), "b", "c", d, e, f) )) } }) } futures.foreach(_.get()) } finally { println("SHUTDOWN") executor.shutdown() sparkSession.stop() } } case class MyClass( a: Timestamp, b: String, c: String, d: Option[String], e: Option[Double], f: Option[Double] ) } {code} So it will usually come up during {code:bash} for i in $(seq 1 200); do echo $i spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar done {code} causing a variety of possible errors, such as {code}Exception in thread "main" java.util.concurrent.ExecutionException: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210){code} or {code}Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789){code} was: This can be replicated (~2% of the time) with {code:scala} import java.sql.Timestamp import java.util.concurrent.{Executors, Future} import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext import scala.util.Random object Main { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder .getOrCreate() import sparkSession.implicits._ val executor = Executors.newFixedThreadPool(1) try { implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(executor) val futures = new ListBuffer[Future[_]]() for (i <- 1 to 3) { futures += executor.submit(new Runnable { override def run(): Unit = { val d = if (Random.nextInt(2) == 0) Some("d value") else None val e = if (Random.nextInt(2) == 0) Some(5.0) else None val f = if (Random.nextInt(2) == 0) Some(6.0) else None println("DEBUG", d, e, f) sparkSession.createDataset(Seq( MyClass(new Timestamp(1L), "b", "c", d, e, f) )) } }) } futures.foreach(_.get()) } finally { println("SHUTDOWN") executor.shutdown() sparkSession.stop() } } case class MyClass( a: Timestamp, b: String, c: String, d: Option[String], e: Option[Double], f: Option[Double] ) } {code} {code:bash} for i in $(seq 1 200); do echo $i spark-submit --master local[4] target/scala-2.11/spark-test_2.11-0.1.jar done {code} causing a variety of possible errors, such as {code}Exception in thread "main" java.util.concurrent.ExecutionException: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$des
[jira] [Created] (SPARK-26555) Thread safety issue causes createDataset to fail with misleading errors
Martin Loncaric created SPARK-26555: --- Summary: Thread safety issue causes createDataset to fail with misleading errors Key: SPARK-26555 URL: https://issues.apache.org/jira/browse/SPARK-26555 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Martin Loncaric This can be replicated (~2% of the time) with {code:scala} import java.sql.Timestamp import java.util.concurrent.{Executors, Future} import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext import scala.util.Random object Main { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder .getOrCreate() import sparkSession.implicits._ val executor = Executors.newFixedThreadPool(1) try { implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(executor) val futures = new ListBuffer[Future[_]]() for (i <- 1 to 3) { futures += executor.submit(new Runnable { override def run(): Unit = { val d = if (Random.nextInt(2) == 0) Some("d value") else None val e = if (Random.nextInt(2) == 0) Some(5.0) else None val f = if (Random.nextInt(2) == 0) Some(6.0) else None println("DEBUG", d, e, f) sparkSession.createDataset(Seq( MyClass(new Timestamp(1L), "b", "c", d, e, f) )) } }) } futures.foreach(_.get()) } finally { println("SHUTDOWN") executor.shutdown() sparkSession.stop() } } case class MyClass( a: Timestamp, b: String, c: String, d: Option[String], e: Option[Double], f: Option[Double] ) } {code} causing a variety of possible errors, such as {{Exception in thread "main" java.util.concurrent.ExecutionException: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: scala.MatchError: scala.Option[String] (of class scala.reflect.internal.Types$ClassArgsTypeRef) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:210)}} or {{Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported at java.util.concurrent.FutureTask.report(FutureTask.java:122) Caused by: java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Double] is not supported at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26192) MesosClusterScheduler reads options from dispatcher conf instead of submission conf
[ https://issues.apache.org/jira/browse/SPARK-26192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848516#comment-16848516 ] Martin Loncaric commented on SPARK-26192: - [~dongjoon] I made a PR: https://github.com/apache/spark/pull/24713 > MesosClusterScheduler reads options from dispatcher conf instead of > submission conf > --- > > Key: SPARK-26192 > URL: https://issues.apache.org/jira/browse/SPARK-26192 > Project: Spark > Issue Type: Improvement > Components: Mesos >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0 >Reporter: Martin Loncaric >Assignee: Martin Loncaric >Priority: Minor > Fix For: 3.0.0 > > > There is at least one option accessed in MesosClusterScheduler that should > come from the submission's configuration instead of the dispatcher's: > spark.mesos.fetcherCache.enable > Coincidentally, the spark.mesos.fetcherCache.enable option was previously > misnamed, as referenced in the linked JIRA. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27098) Flaky missing file parts when writing to Ceph without error
[ https://issues.apache.org/jira/browse/SPARK-27098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16853245#comment-16853245 ] Martin Loncaric commented on SPARK-27098: - After upgrading to Hadoop 2.9 and using {{spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2}}, the problem is substantially less frequent, but still present. I think this suggests that moving files sometimes quietly fails. > Flaky missing file parts when writing to Ceph without error > --- > > Key: SPARK-27098 > URL: https://issues.apache.org/jira/browse/SPARK-27098 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Martin Loncaric >Priority: Major > Attachments: sanitized_stdout_1.txt > > > https://stackoverflow.com/questions/54935822/spark-s3a-write-omits-upload-part-without-failure/55031233?noredirect=1#comment96835218_55031233 > Using 2.4.0 with Hadoop 2.7, hadoop-aws 2.7.5, and the Ceph S3 endpoint. > occasionally a file part will be missing; i.e. part 3 here: > ``` > > aws s3 ls my-bucket/folder/ > 2019-02-28 13:07:21 0 _SUCCESS > 2019-02-28 13:06:58 79428651 > part-0-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:06:59 79586172 > part-1-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:00 79561910 > part-2-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:01 79192617 > part-4-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:07 79364413 > part-5-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:08 79623254 > part-6-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79445030 > part-7-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:10 79474923 > part-8-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:11 79477310 > part-9-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:12 79331453 > part-00010-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79567600 > part-00011-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:13 79388012 > part-00012-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:14 79308387 > part-00013-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:15 79455483 > part-00014-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:17 79512342 > part-00015-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79403307 > part-00016-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:18 79617769 > part-00017-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:19 79333534 > part-00018-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > 2019-02-28 13:07:20 79543324 > part-00019-5789ebf5-b55d-4715-8bb5-dfc5c4e4b999-c000.snappy.parquet > ``` > However, the write succeeds and leaves a _SUCCESS file. > This can be caught by additionally checking afterward whether the number of > written file parts agrees with the number of partitions, but Spark should at > least fail on its own and leave a meaningful stack trace in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-31356: Description: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups((a, b) => out) .map(_._2) {code} However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}: 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only Proposal: {code:scala} ds .groupByKey(_.y) .reduceGroupValues((a, b) => out) {code} Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}} s instead of just {{UnsafeRow}} s. Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}. was: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) .map(_._2) {code} However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}: 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only Proposal: {code:scala} ds .groupByKey(_.y) .reduceGroupValues((aVal, bVal) => outVal) {code} Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}} s instead of just {{UnsafeRow}} s. Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}. > reduceGroupValues function for KeyValueGroupedDataset > - > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => out) > .map(_._2) > {code} > However, the the optimized plan unfortunately ends up with an unnecessary > implicit serialization during aggregation step, followed by > {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use > case, there are 2 things we can improve with a specialized > {{.reduceGroupValues}}: > 1. avoid the extra serialization (baked in to AggregationIterator > implementations) and deserialization > 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by > emitting the values only > Proposal: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroupValues((a, b) => out) > {code} > Create an {{AggregationIteratorBase}} superclass that can emit general > {{InternalRow}} s instead of just {{UnsafeRow}} s. > Create a new {{AggregationIteratorBase}} implementation called > {{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing > only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} > or {{Complete}} modes. Since we don't need to emit the keys, which are > serialized, this is not too complicated. To make use of this, have the > {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a > {{CatalystSerde.serialize}}. -- This message was sent by Atlassian Jira (v8.3.4#803005) ---
[jira] [Commented] (SPARK-31356) Splitting Aggregate node into separate Aggregate and Serialize for Optimizer
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157128#comment-17157128 ] Martin Loncaric commented on SPARK-31356: - Actually, there seem to be 3 separate performance issues: 1. unnecessary appendColumns when groupByKey function just returns a subset of columns (though this is hard to get around in a type safe way) 2. unnecessary serialize + deserialize 3. actually the RDD's API is roughly a whole 2x faster. It seems there's a lot of room to improve aggregations > Splitting Aggregate node into separate Aggregate and Serialize for Optimizer > > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => {...}) > .map(_._2) > {code} > However, the .map(_._2) step (taking values and throwing keys away) > unfortunately often ends up as an unnecessary serialization during > aggregation step, followed by {{DeserializeToObject + MapElements (from (K, > V) => V) + SerializeFromObject}} in the optimized logical plan. In this > example, it would be more ideal to either skip the > deserialization/serialization or {{Project (from (K, V) => V)}}. Even > manually doing a {{.select(...).as[T]}} to replace the `.map` is quite > tricky, because > * the columns are complicated, like {{[value, > ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}} > * it breaks the nice type checking of Datasets > Proposal: > Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like > {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and > a {{SerializeFromObject}} node so that the Optimizer can eliminate the > serialization when it is redundant. Change aggregations to emit deserialized > results. > I had 2 ideas for what we could change: either add a new feature to > {{.reduceGroupValues}} that projects to only the necessary columns, or do > this improvement. I thought this would be a better solution because > * it will improve the performance of existing Spark applications with no > modifications > * feature growth is undesirable > Uncertainties: > Affects Version: I'm not sure - if I submit a PR soon, can we get this into > 3.0? Or only 3.1? And I assume we're not adding new features to 2.4? > Complications: Are there any hazards in splitting Aggregation into > Aggregation + SerializeFromObject that I'm not aware of? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-31356: Summary: reduceGroupValues function for KeyValueGroupedDataset (was: reduceGroupValues function for Dataset) > reduceGroupValues function for KeyValueGroupedDataset > - > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => {...}) > .map(_._2) > {code} > However, the .map(_._2) step (taking values and throwing keys away) > unfortunately often ends up as an unnecessary serialization during > aggregation step, followed by {{DeserializeToObject + MapElements (from (K, > V) => V) + SerializeFromObject}} in the optimized logical plan. In this > example, it would be more ideal to either skip the > deserialization/serialization or {{Project (from (K, V) => V)}}. Even > manually doing a {{.select(...).as[T]}} to replace the `.map` is quite > tricky, because > * the columns are complicated, like {{[value, > ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}} > * it breaks the nice type checking of Datasets > Proposal: > Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like > {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and > a {{SerializeFromObject}} node so that the Optimizer can eliminate the > serialization when it is redundant. Change aggregations to emit deserialized > results. > I had 2 ideas for what we could change: either add a new feature to > {{.reduceGroupValues}} that projects to only the necessary columns, or do > this improvement. I thought this would be a better solution because > * it will improve the performance of existing Spark applications with no > modifications > * feature growth is undesirable > Uncertainties: > Affects Version: I'm not sure - if I submit a PR soon, can we get this into > 3.0? Or only 3.1? And I assume we're not adding new features to 2.4? > Complications: Are there any hazards in splitting Aggregation into > Aggregation + SerializeFromObject that I'm not aware of? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31356) reduceGroupValues function for Dataset
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-31356: Summary: reduceGroupValues function for Dataset (was: Splitting Aggregate node into separate Aggregate and Serialize for Optimizer) > reduceGroupValues function for Dataset > -- > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => {...}) > .map(_._2) > {code} > However, the .map(_._2) step (taking values and throwing keys away) > unfortunately often ends up as an unnecessary serialization during > aggregation step, followed by {{DeserializeToObject + MapElements (from (K, > V) => V) + SerializeFromObject}} in the optimized logical plan. In this > example, it would be more ideal to either skip the > deserialization/serialization or {{Project (from (K, V) => V)}}. Even > manually doing a {{.select(...).as[T]}} to replace the `.map` is quite > tricky, because > * the columns are complicated, like {{[value, > ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}} > * it breaks the nice type checking of Datasets > Proposal: > Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like > {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and > a {{SerializeFromObject}} node so that the Optimizer can eliminate the > serialization when it is redundant. Change aggregations to emit deserialized > results. > I had 2 ideas for what we could change: either add a new feature to > {{.reduceGroupValues}} that projects to only the necessary columns, or do > this improvement. I thought this would be a better solution because > * it will improve the performance of existing Spark applications with no > modifications > * feature growth is undesirable > Uncertainties: > Affects Version: I'm not sure - if I submit a PR soon, can we get this into > 3.0? Or only 3.1? And I assume we're not adding new features to 2.4? > Complications: Are there any hazards in splitting Aggregation into > Aggregation + SerializeFromObject that I'm not aware of? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-31356: Description: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups((a, b) => {...}) .map(_._2) {code} However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}: 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only Proposal: Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}}s instead of just {{UnsafeRow}}s. Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}. was: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups((a, b) => {...}) .map(_._2) {code} However, the .map(_._2) step (taking values and throwing keys away) unfortunately often ends up as an unnecessary serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + SerializeFromObject}} in the optimized logical plan. In this example, it would be more ideal to either skip the deserialization/serialization or {{Project (from (K, V) => V)}}. Even manually doing a {{.select(...).as[T]}} to replace the `.map` is quite tricky, because * the columns are complicated, like {{[value, ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}} * it breaks the nice type checking of Datasets Proposal: Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and a {{SerializeFromObject}} node so that the Optimizer can eliminate the serialization when it is redundant. Change aggregations to emit deserialized results. I had 2 ideas for what we could change: either add a new feature to {{.reduceGroupValues}} that projects to only the necessary columns, or do this improvement. I thought this would be a better solution because * it will improve the performance of existing Spark applications with no modifications * feature growth is undesirable Uncertainties: Affects Version: I'm not sure - if I submit a PR soon, can we get this into 3.0? Or only 3.1? And I assume we're not adding new features to 2.4? Complications: Are there any hazards in splitting Aggregation into Aggregation + SerializeFromObject that I'm not aware of? > reduceGroupValues function for KeyValueGroupedDataset > - > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => {...}) > .map(_._2) > {code} > However, the the optimized plan unfortunately ends up with an unnecessary > implicit serialization during aggregation step, followed by > {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use > case, there are 2 things we can improve with a specialized > {{.reduceGroupValues}}: > 1. avoid the extra serialization (baked in to AggregationIterator > implementations) and deserialization > 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by > emitting the values only > Proposal: > Create an {{AggregationIteratorBase}} superclass that can emit general > {{InternalRow}}s instead of just {{UnsafeRow}}s. > Create a new {{AggregationIteratorBase}} implementation called > {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing > only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} > or {{Complete}} modes. Since we don't need to emit the keys, which are > serialized, this is not too complicated. To ma
[jira] [Comment Edited] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157128#comment-17157128 ] Martin Loncaric edited comment on SPARK-31356 at 7/19/20, 1:07 AM: --- Actually, there seem to be 4 separate performance issues: 1. unnecessary serialize + deserialize 2. unnecessary map 3. unnecessary appendColumns when groupByKey function just returns a subset of columns (though this is hard to get around in a type safe way) 4. actually the RDD's API is roughly a whole 2x faster. There might be even more room to improve aggregations was (Author: mwlon): Actually, there seem to be 3 separate performance issues: 1. unnecessary appendColumns when groupByKey function just returns a subset of columns (though this is hard to get around in a type safe way) 2. unnecessary serialize + deserialize 3. actually the RDD's API is roughly a whole 2x faster. It seems there's a lot of room to improve aggregations > reduceGroupValues function for KeyValueGroupedDataset > - > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => {...}) > .map(_._2) > {code} > However, the the optimized plan unfortunately ends up with an unnecessary > implicit serialization during aggregation step, followed by > {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use > case, there are 2 things we can improve with a specialized > {{.reduceGroupValues}}: > 1. avoid the extra serialization (baked in to AggregationIterator > implementations) and deserialization > 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by > emitting the values only > Proposal: > Create an {{AggregationIteratorBase}} superclass that can emit general > {{InternalRow}}s instead of just {{UnsafeRow}}s. > Create a new {{AggregationIteratorBase}} implementation called > {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing > only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} > or {{Complete}} modes. Since we don't need to emit the keys, which are > serialized, this is not too complicated. To make use of this, have the > {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a > {{CatalystSerde.serialize}}. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157128#comment-17157128 ] Martin Loncaric edited comment on SPARK-31356 at 7/19/20, 1:07 AM: --- Actually, there seem to be 4 separate performance issues: 1. unnecessary serialize + deserialize 2. unnecessary map 3. unnecessary appendColumns in the case when the groupByKey function just returns a subset of columns (though this is hard to get around in a type safe way) 4. actually the RDD's API is roughly a whole 2x faster. There might be even more room to improve aggregations was (Author: mwlon): Actually, there seem to be 4 separate performance issues: 1. unnecessary serialize + deserialize 2. unnecessary map 3. unnecessary appendColumns when groupByKey function just returns a subset of columns (though this is hard to get around in a type safe way) 4. actually the RDD's API is roughly a whole 2x faster. There might be even more room to improve aggregations > reduceGroupValues function for KeyValueGroupedDataset > - > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => {...}) > .map(_._2) > {code} > However, the the optimized plan unfortunately ends up with an unnecessary > implicit serialization during aggregation step, followed by > {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use > case, there are 2 things we can improve with a specialized > {{.reduceGroupValues}}: > 1. avoid the extra serialization (baked in to AggregationIterator > implementations) and deserialization > 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by > emitting the values only > Proposal: > Create an {{AggregationIteratorBase}} superclass that can emit general > {{InternalRow}}s instead of just {{UnsafeRow}}s. > Create a new {{AggregationIteratorBase}} implementation called > {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing > only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} > or {{Complete}} modes. Since we don't need to emit the keys, which are > serialized, this is not too complicated. To make use of this, have the > {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a > {{CatalystSerde.serialize}}. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-31356: Issue Type: New Feature (was: Improvement) > reduceGroupValues function for KeyValueGroupedDataset > - > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => {...}) > .map(_._2) > {code} > However, the the optimized plan unfortunately ends up with an unnecessary > implicit serialization during aggregation step, followed by > {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use > case, there are 2 things we can improve with a specialized > {{.reduceGroupValues}}: > 1. avoid the extra serialization (baked in to AggregationIterator > implementations) and deserialization > 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by > emitting the values only > Proposal: > Create an {{AggregationIteratorBase}} superclass that can emit general > {{InternalRow}}s instead of just {{UnsafeRow}}s. > Create a new {{AggregationIteratorBase}} implementation called > {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing > only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} > or {{Complete}} modes. Since we don't need to emit the keys, which are > serialized, this is not too complicated. To make use of this, have the > {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a > {{CatalystSerde.serialize}}. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-31356: Description: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) .map(_._2) {code} However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}: 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only Proposal: {code:scala} ds .groupByKey(_.y) .reduceGroupValues((aVal, bVal) => outVal) {code} Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}}s instead of just {{UnsafeRow}}s. Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}. was: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups((a, b) => {...}) .map(_._2) {code} However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}: 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only Proposal: Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}}s instead of just {{UnsafeRow}}s. Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}. > reduceGroupValues function for KeyValueGroupedDataset > - > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) > .map(_._2) > {code} > However, the the optimized plan unfortunately ends up with an unnecessary > implicit serialization during aggregation step, followed by > {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use > case, there are 2 things we can improve with a specialized > {{.reduceGroupValues}}: > 1. avoid the extra serialization (baked in to AggregationIterator > implementations) and deserialization > 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by > emitting the values only > Proposal: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroupValues((aVal, bVal) => outVal) > {code} > Create an {{AggregationIteratorBase}} superclass that can emit general > {{InternalRow}}s instead of just {{UnsafeRow}}s. > Create a new {{AggregationIteratorBase}} implementation called > {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing > only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} > or {{Complete}} modes. Since we don't need to emit the keys, which are > serialized, this is not too complicated. To make use of this, have the > {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a > {{CatalystSerde.serialize}}. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-u
[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-31356: Description: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) .map(_._2) {code} However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}: 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only Proposal: {code:scala} ds .groupByKey(_.y) .reduceGroupValues((aVal, bVal) => outVal) {code} Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}} s instead of just {{UnsafeRow}} s. Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}. was: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) .map(_._2) {code} However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}: 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only Proposal: {code:scala} ds .groupByKey(_.y) .reduceGroupValues((aVal, bVal) => outVal) {code} Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}}s instead of just {{UnsafeRow}}s. Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}. > reduceGroupValues function for KeyValueGroupedDataset > - > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) > .map(_._2) > {code} > However, the the optimized plan unfortunately ends up with an unnecessary > implicit serialization during aggregation step, followed by > {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use > case, there are 2 things we can improve with a specialized > {{.reduceGroupValues}}: > 1. avoid the extra serialization (baked in to AggregationIterator > implementations) and deserialization > 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by > emitting the values only > Proposal: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroupValues((aVal, bVal) => outVal) > {code} > Create an {{AggregationIteratorBase}} superclass that can emit general > {{InternalRow}} s instead of just {{UnsafeRow}} s. > Create a new {{AggregationIteratorBase}} implementation called > {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing > only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} > or {{Complete}} modes. Since we don't need to emit the keys, which are > serialized, this is not too complicated. To make use of this, have the > {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a > {{CatalystSerde.serialize}}. -- This message was sent by Atlassian Jira (v8.
[jira] [Updated] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-31356: Description: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) .map(_._2) {code} However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}: 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only Proposal: {code:scala} ds .groupByKey(_.y) .reduceGroupValues((aVal, bVal) => outVal) {code} Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}} s instead of just {{UnsafeRow}} s. Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}. was: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) .map(_._2) {code} However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}: 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only Proposal: {code:scala} ds .groupByKey(_.y) .reduceGroupValues((aVal, bVal) => outVal) {code} Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}} s instead of just {{UnsafeRow}} s. Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}. > reduceGroupValues function for KeyValueGroupedDataset > - > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) > .map(_._2) > {code} > However, the the optimized plan unfortunately ends up with an unnecessary > implicit serialization during aggregation step, followed by > {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use > case, there are 2 things we can improve with a specialized > {{.reduceGroupValues}}: > 1. avoid the extra serialization (baked in to AggregationIterator > implementations) and deserialization > 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by > emitting the values only > Proposal: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroupValues((aVal, bVal) => outVal) > {code} > Create an {{AggregationIteratorBase}} superclass that can emit general > {{InternalRow}} s instead of just {{UnsafeRow}} s. > Create a new {{AggregationIteratorBase}} implementation called > {{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing > only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} > or {{Complete}} modes. Since we don't need to emit the keys, which are > serialized, this is not too complicated. To make use of this, have the > {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a > {{CatalystSerde.serialize}}. -- This message was sent by Atlassian Jir
[jira] [Created] (SPARK-31356) KeyValueGroupedDataset method to reduce and take values only
Martin Loncaric created SPARK-31356: --- Summary: KeyValueGroupedDataset method to reduce and take values only Key: SPARK-31356 URL: https://issues.apache.org/jira/browse/SPARK-31356 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Martin Loncaric Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups((a, b) => {...}) .map(_._2) {code} However, the .map(_._2) step (taking values and throwing keys away) unfortunately often ends up as an unnecessary serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + SerializeFromObject}} in the optimized logical plan. In this example, it would be more ideal something like {{Project (from (K, V) => V)}} or . Even manually doing a `.select(...).as[T]` to replace the `.map` is quite tricky, because * the columns are complicated, like {{[value, ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}} * it breaks the nice type checking of Datasets Proposal: Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like {{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and a {{SerializeFromObject}} node so that the Optimizer can eliminate the serialization when it is redundant. Change aggregations to emit deserialized results. I had 2 ideas for what we could change: either add a new feature to {{.reduceGroupValues}} that projects to only the necessary columns, or do this improvement. I thought this would be a better solution because * it will improve the performance of existing Spark applications with no modifications * feature growth is undesirable Uncertainties: Affects Version: I'm not sure - if I submit a PR soon, can we get this into 3.0? Or only 3.1? And I assume we're not adding new features to 2.4? Complications: Are there any hazards in splitting Aggregation into Aggregation + SerializeFromObject that I'm not aware of? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31356) KeyValueGroupedDataset method to reduce and take values only
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-31356: Description: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups((a, b) => {...}) .map(_._2) {code} However, the .map(_._2) step (taking values and throwing keys away) unfortunately often ends up as an unnecessary serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + SerializeFromObject}} in the optimized logical plan. In this example, it would be more ideal to either skip the deserialization/serialization or {{Project (from (K, V) => V)}}. Even manually doing a {{.select(...).as[T]}} to replace the `.map` is quite tricky, because * the columns are complicated, like {{[value, ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}} * it breaks the nice type checking of Datasets Proposal: Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like {{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and a {{SerializeFromObject}} node so that the Optimizer can eliminate the serialization when it is redundant. Change aggregations to emit deserialized results. I had 2 ideas for what we could change: either add a new feature to {{.reduceGroupValues}} that projects to only the necessary columns, or do this improvement. I thought this would be a better solution because * it will improve the performance of existing Spark applications with no modifications * feature growth is undesirable Uncertainties: Affects Version: I'm not sure - if I submit a PR soon, can we get this into 3.0? Or only 3.1? And I assume we're not adding new features to 2.4? Complications: Are there any hazards in splitting Aggregation into Aggregation + SerializeFromObject that I'm not aware of? was: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups((a, b) => {...}) .map(_._2) {code} However, the .map(_._2) step (taking values and throwing keys away) unfortunately often ends up as an unnecessary serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + SerializeFromObject}} in the optimized logical plan. In this example, it would be more ideal something like {{Project (from (K, V) => V)}} or . Even manually doing a `.select(...).as[T]` to replace the `.map` is quite tricky, because * the columns are complicated, like {{[value, ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}} * it breaks the nice type checking of Datasets Proposal: Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like {{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and a {{SerializeFromObject}} node so that the Optimizer can eliminate the serialization when it is redundant. Change aggregations to emit deserialized results. I had 2 ideas for what we could change: either add a new feature to {{.reduceGroupValues}} that projects to only the necessary columns, or do this improvement. I thought this would be a better solution because * it will improve the performance of existing Spark applications with no modifications * feature growth is undesirable Uncertainties: Affects Version: I'm not sure - if I submit a PR soon, can we get this into 3.0? Or only 3.1? And I assume we're not adding new features to 2.4? Complications: Are there any hazards in splitting Aggregation into Aggregation + SerializeFromObject that I'm not aware of? > KeyValueGroupedDataset method to reduce and take values only > > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => {...}) > .map(_._2) > {code} > However, the .map(_._2) step (taking values and throwing keys away) > unfortunately often ends up as an unnecessary serialization during > aggregation step, followed by {{DeserializeToObject + MapElements (from (K, > V) => V) + SerializeFromObject}} in the optimized logical plan. In this > example, it would be more ideal to either skip the > deserialization/serialization or {{Project (from (K, V) => V)}}. Even > manually doing a {{.select(...).as[T]}} to replace the `.map` is quite > tricky, beca
[jira] [Updated] (SPARK-31356) KeyValueGroupedDataset method to reduce and take values only
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-31356: Description: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups((a, b) => {...}) .map(_._2) {code} However, the .map(_._2) step (taking values and throwing keys away) unfortunately often ends up as an unnecessary serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + SerializeFromObject}} in the optimized logical plan. In this example, it would be more ideal to either skip the deserialization/serialization or {{Project (from (K, V) => V)}}. Even manually doing a {{.select(...).as[T]}} to replace the `.map` is quite tricky, because * the columns are complicated, like {{[value, ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}} * it breaks the nice type checking of Datasets Proposal: Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and a {{SerializeFromObject}} node so that the Optimizer can eliminate the serialization when it is redundant. Change aggregations to emit deserialized results. I had 2 ideas for what we could change: either add a new feature to {{.reduceGroupValues}} that projects to only the necessary columns, or do this improvement. I thought this would be a better solution because * it will improve the performance of existing Spark applications with no modifications * feature growth is undesirable Uncertainties: Affects Version: I'm not sure - if I submit a PR soon, can we get this into 3.0? Or only 3.1? And I assume we're not adding new features to 2.4? Complications: Are there any hazards in splitting Aggregation into Aggregation + SerializeFromObject that I'm not aware of? was: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups((a, b) => {...}) .map(_._2) {code} However, the .map(_._2) step (taking values and throwing keys away) unfortunately often ends up as an unnecessary serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + SerializeFromObject}} in the optimized logical plan. In this example, it would be more ideal to either skip the deserialization/serialization or {{Project (from (K, V) => V)}}. Even manually doing a {{.select(...).as[T]}} to replace the `.map` is quite tricky, because * the columns are complicated, like {{[value, ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}} * it breaks the nice type checking of Datasets Proposal: Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like {{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and a {{SerializeFromObject}} node so that the Optimizer can eliminate the serialization when it is redundant. Change aggregations to emit deserialized results. I had 2 ideas for what we could change: either add a new feature to {{.reduceGroupValues}} that projects to only the necessary columns, or do this improvement. I thought this would be a better solution because * it will improve the performance of existing Spark applications with no modifications * feature growth is undesirable Uncertainties: Affects Version: I'm not sure - if I submit a PR soon, can we get this into 3.0? Or only 3.1? And I assume we're not adding new features to 2.4? Complications: Are there any hazards in splitting Aggregation into Aggregation + SerializeFromObject that I'm not aware of? > KeyValueGroupedDataset method to reduce and take values only > > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => {...}) > .map(_._2) > {code} > However, the .map(_._2) step (taking values and throwing keys away) > unfortunately often ends up as an unnecessary serialization during > aggregation step, followed by {{DeserializeToObject + MapElements (from (K, > V) => V) + SerializeFromObject}} in the optimized logical plan. In this > example, it would be more ideal to either skip the > deserialization/serialization or {{Project (from (K, V) => V)}}. Even > manually doing a {{.select(...).as[T]}} to replac
[jira] [Updated] (SPARK-31356) Splitting Aggregate node into separate Aggregate and Serialize for Optimizer
[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martin Loncaric updated SPARK-31356: Summary: Splitting Aggregate node into separate Aggregate and Serialize for Optimizer (was: KeyValueGroupedDataset method to reduce and take values only) > Splitting Aggregate node into separate Aggregate and Serialize for Optimizer > > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Martin Loncaric >Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups((a, b) => {...}) > .map(_._2) > {code} > However, the .map(_._2) step (taking values and throwing keys away) > unfortunately often ends up as an unnecessary serialization during > aggregation step, followed by {{DeserializeToObject + MapElements (from (K, > V) => V) + SerializeFromObject}} in the optimized logical plan. In this > example, it would be more ideal to either skip the > deserialization/serialization or {{Project (from (K, V) => V)}}. Even > manually doing a {{.select(...).as[T]}} to replace the `.map` is quite > tricky, because > * the columns are complicated, like {{[value, > ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}} > * it breaks the nice type checking of Datasets > Proposal: > Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like > {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and > a {{SerializeFromObject}} node so that the Optimizer can eliminate the > serialization when it is redundant. Change aggregations to emit deserialized > results. > I had 2 ideas for what we could change: either add a new feature to > {{.reduceGroupValues}} that projects to only the necessary columns, or do > this improvement. I thought this would be a better solution because > * it will improve the performance of existing Spark applications with no > modifications > * feature growth is undesirable > Uncertainties: > Affects Version: I'm not sure - if I submit a PR soon, can we get this into > 3.0? Or only 3.1? And I assume we're not adding new features to 2.4? > Complications: Are there any hazards in splitting Aggregation into > Aggregation + SerializeFromObject that I'm not aware of? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org