[jira] [Commented] (SPARK-36321) Do not fail application in kubernetes if name is too long
[ https://issues.apache.org/jira/browse/SPARK-36321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770619#comment-17770619 ] Wing Yew Poon commented on SPARK-36321: --- [~dongjoon], is this fixed by SPARK-39614? > Do not fail application in kubernetes if name is too long > - > > Key: SPARK-36321 > URL: https://issues.apache.org/jira/browse/SPARK-36321 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 3.3.0 >Reporter: XiDuo You >Priority: Major > Labels: pull-request-available > > If we have a long spark app name and start with k8s master, we will get the > execption. > {code:java} > java.lang.IllegalArgumentException: > 'a-89fe2f7ae71c3570' in > spark.kubernetes.executor.podNamePrefix is invalid. must conform > https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-label-names > and the value length <= 47 > at > org.apache.spark.internal.config.TypedConfigBuilder.$anonfun$checkValue$1(ConfigBuilder.scala:108) > at > org.apache.spark.internal.config.TypedConfigBuilder.$anonfun$transform$1(ConfigBuilder.scala:101) > at scala.Option.map(Option.scala:230) > at > org.apache.spark.internal.config.OptionalConfigEntry.readFrom(ConfigEntry.scala:239) > at > org.apache.spark.internal.config.OptionalConfigEntry.readFrom(ConfigEntry.scala:214) > at org.apache.spark.SparkConf.get(SparkConf.scala:261) > at > org.apache.spark.deploy.k8s.KubernetesConf.get(KubernetesConf.scala:67) > at > org.apache.spark.deploy.k8s.KubernetesExecutorConf.(KubernetesConf.scala:147) > at > org.apache.spark.deploy.k8s.KubernetesConf$.createExecutorConf(KubernetesConf.scala:231) > at > org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$requestNewExecutors$2(ExecutorPodsAllocator.scala:367) > {code} > Use app name as the executor pod name is the Spark internal behavior and we > should not make application failure. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38918) Nested column pruning should filter out attributes that do not belong to the current relation
[ https://issues.apache.org/jira/browse/SPARK-38918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644041#comment-17644041 ] Wing Yew Poon commented on SPARK-38918: --- It seems that this is fixed in 3.2.2 ([7c0b9e6e|https://github.com/apache/spark/commit/7c0b9e6e6f680db45c1e2602b85753d9b521bb58]), but for some reason, 3.2.2 is not in the Fixed Version/s. Can we please correct this? Probably because of this, this issue does not appear in https://spark.apache.org/releases/spark-release-3-2-2.html. > Nested column pruning should filter out attributes that do not belong to the > current relation > - > > Key: SPARK-38918 > URL: https://issues.apache.org/jira/browse/SPARK-38918 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Allison Wang >Assignee: Allison Wang >Priority: Major > Fix For: 3.1.3, 3.0.4, 3.3.0, 3.4.0 > > > `SchemaPruning` currently does not check if the root field of a nested column > belongs to the current relation. This can happen when the filter contains > correlated subqueries, where the children field can contain attributes from > both the inner and the outer query. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38992) Avoid using bash -c in ShellBasedGroupsMappingProvider
[ https://issues.apache.org/jira/browse/SPARK-38992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568661#comment-17568661 ] Wing Yew Poon commented on SPARK-38992: --- Does this vulnerability not affect Spark 2.4? > Avoid using bash -c in ShellBasedGroupsMappingProvider > -- > > Key: SPARK-38992 > URL: https://issues.apache.org/jira/browse/SPARK-38992 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.3, 3.1.2, 3.2.1, 3.3.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Major > Fix For: 3.1.3, 3.0.4, 3.3.0, 3.2.2 > > > Using bash -c can allow arbitrary shall execution from the end user. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34779) ExecutorMetricsPoller should keep stage entry in stageTCMP until a heartbeat occurs
[ https://issues.apache.org/jira/browse/SPARK-34779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wing Yew Poon updated SPARK-34779: -- Summary: ExecutorMetricsPoller should keep stage entry in stageTCMP until a heartbeat occurs (was: ExecutoMetricsPoller should keep stage entry in stageTCMP until a heartbeat occurs) > ExecutorMetricsPoller should keep stage entry in stageTCMP until a heartbeat > occurs > --- > > Key: SPARK-34779 > URL: https://issues.apache.org/jira/browse/SPARK-34779 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Baohe Zhang >Priority: Major > > The current implementation of ExecutoMetricsPoller uses task count in each > stage to decide whether to keep a stage entry or not. In the case of the > executor only has 1 core, it may have these issues: > # Peak metrics missing (due to stage entry being removed within a heartbeat > interval) > # Unnecessary and frequent hashmap entry removal and insertion. > Assuming an executor with 1 core has 2 tasks (task1 and task2, both belong to > stage (0,0)) to execute in a heartbeat interval, the workflow in current > ExecutorMetricsPoller implementation would be: > 1. task1 start -> stage (0, 0) entry created in stageTCMP, task count > increment to1 > 2. 1st poll() -> update peak metrics of stage (0, 0) > 3. task1 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry > removed, peak metrics lost. > 4. task2 start -> stage (0, 0) entry created in stageTCMP, task count > increment to1 > 5. 2nd poll() -> update peak metrics of stage (0, 0) > 6. task2 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry > removed, peak metrics lost > 7. heartbeat() -> empty or inaccurate peak metrics for stage(0,0) reported. > We can fix the issue by keeping entries with task count = 0 in stageTCMP map > until a heartbeat occurs. At the heartbeat, after reporting the peak metrics > for each stage, we scan each stage in stageTCMP and remove entries with task > count = 0. > After the fix, the workflow would be: > 1. task1 start -> stage (0, 0) entry created in stageTCMP, task count > increment to1 > 2. 1st poll() -> update peak metrics of stage (0, 0) > 3. task1 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) > still remain. > 4. task2 start -> task count of stage (0,0) increment to1 > 5. 2nd poll() -> update peak metrics of stage (0, 0) > 6. task2 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) > still remain. > 7. heartbeat() -> accurate peak metrics for stage (0, 0) reported. Remove > entry for stage (0,0) in stageTCMP because its task count is 0. > > How to verify the behavior? > Submit a job with a custom polling interval (e.g., 2s) and > spark.executor.cores=1 and check the debug logs of ExecutoMetricsPoller. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34779) ExecutoMetricsPoller should keep stage entry in stageTCMP until a heartbeat occurs
[ https://issues.apache.org/jira/browse/SPARK-34779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311068#comment-17311068 ] Wing Yew Poon commented on SPARK-34779: --- I don't think there is a bug as claimed in the description; in the scenario outlined in the description, no metric peaks are lost. Consider 1. task1 start -> stage (0, 0) entry created in stageTCMP, task count increment to1 2. 1st poll() -> update peak metrics of stage (0, 0) 3. task1 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry removed, peak metrics lost. In 2., ExecutorMetricsPoller#poll() updates stageTCMP and taskMetricPeaks. When the task ends in 3., the task end event will cause the task metric peaks to be posted to the EventLoggingListener, and the peaks will be aggregated in the liveStageExecutorMetrics, just as when a heartbeat happens and executor metric updates get posted to the EventLoggingListener. The peak metrics for task 1 will be used to update the peaks for the executor for stage (0, 0) in the liveStageExecutorMetrics in EventLoggingListener; they are not lost. However, keeping the entry for a stage in stageTCMP when the task count reaches zero and only removing it on the executor heartbeat if the task count is still zero is an improvement, in reducing removal and insertion of entries. > ExecutoMetricsPoller should keep stage entry in stageTCMP until a heartbeat > occurs > -- > > Key: SPARK-34779 > URL: https://issues.apache.org/jira/browse/SPARK-34779 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 >Reporter: Baohe Zhang >Priority: Major > > The current implementation of ExecutoMetricsPoller uses task count in each > stage to decide whether to keep a stage entry or not. In the case of the > executor only has 1 core, it may have these issues: > # Peak metrics missing (due to stage entry being removed within a heartbeat > interval) > # Unnecessary and frequent hashmap entry removal and insertion. > Assuming an executor with 1 core has 2 tasks (task1 and task2, both belong to > stage (0,0)) to execute in a heartbeat interval, the workflow in current > ExecutorMetricsPoller implementation would be: > 1. task1 start -> stage (0, 0) entry created in stageTCMP, task count > increment to1 > 2. 1st poll() -> update peak metrics of stage (0, 0) > 3. task1 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry > removed, peak metrics lost. > 4. task2 start -> stage (0, 0) entry created in stageTCMP, task count > increment to1 > 5. 2nd poll() -> update peak metrics of stage (0, 0) > 6. task2 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry > removed, peak metrics lost > 7. heartbeat() -> empty or inaccurate peak metrics for stage(0,0) reported. > We can fix the issue by keeping entries with task count = 0 in stageTCMP map > until a heartbeat occurs. At the heartbeat, after reporting the peak metrics > for each stage, we scan each stage in stageTCMP and remove entries with task > count = 0. > After the fix, the workflow would be: > 1. task1 start -> stage (0, 0) entry created in stageTCMP, task count > increment to1 > 2. 1st poll() -> update peak metrics of stage (0, 0) > 3. task1 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) > still remain. > 4. task2 start -> task count of stage (0,0) increment to1 > 5. 2nd poll() -> update peak metrics of stage (0, 0) > 6. task2 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) > still remain. > 7. heartbeat() -> accurate peak metrics for stage (0, 0) reported. Remove > entry for stage (0,0) in stageTCMP because its task count is 0. > > How to verify the behavior? > Submit a job with a custom polling interval (e.g., 2s) and > spark.executor.cores=1 and check the debug logs of ExecutoMetricsPoller. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33001) Why am I receiving this warning?
[ https://issues.apache.org/jira/browse/SPARK-33001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17232192#comment-17232192 ] Wing Yew Poon commented on SPARK-33001: --- I may have been the last to touch ProcfsMetricsGetter.scala but it was authored by [~rezasafi]. [~xorz57] and [~dannylee8], are you encountering the warning when running Spark on Windows? The warning is harmless. ProcfsMetricsGetter is only meant to be run on Linux machines with a /proc filesystem. The warning happened because the command "getconf PAGESIZE" was run and it is not a valid command on Windows so an exception was caught. ProcfsMetricsGetter is actually only used when spark.executor.processTreeMetrics.enabled=true. However, the class is instantiated and the warning occurs then, even though after that the class is not used. Ideally, you should not see this warning. Ideally, isProcfsAvailable should be checked before computePageSize() is called (the latter should not be called if procfs is not available, and it is not on Windows). So it is a minor bug that you see this warning. But it can be safely ignored. > Why am I receiving this warning? > > > Key: SPARK-33001 > URL: https://issues.apache.org/jira/browse/SPARK-33001 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: George Fotopoulos >Priority: Major > > I am running Apache Spark Core using Scala 2.12.12 on IntelliJ IDEA 2020.2 > with Docker 2.3.0.5 > I am running Windows 10 build 2004 > Can somebody explain me why am I receiving this warning and what can I do > about it? > I tried googling this warning but, all I found was people asking about it and > no answers. > [screenshot|https://user-images.githubusercontent.com/1548352/94319642-c8102c80-ff93-11ea-9fea-f58de8da2268.png] > {code:scala} > WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a > result reporting of ProcessTree metrics is stopped > {code} > Thanks in advance! -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31693) Investigate AmpLab Jenkins server network issue
[ https://issues.apache.org/jira/browse/SPARK-31693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163987#comment-17163987 ] Wing Yew Poon commented on SPARK-31693: --- I'm seeing a problem with the .m2 cache on amp-jenkins-worker-06. In https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126370/console {noformat} [EnvInject] - Loading node environment variables. Building remotely on amp-jenkins-worker-06 (centos spark-test) in workspace /home/jenkins/workspace/SparkPullRequestBuilder ... Running build tests exec: curl -s -L https://downloads.lightbend.com/zinc/0.3.15/zinc-0.3.15.tgz exec: curl -s -L https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.tgz Using `mvn` from path: /home/jenkins/tools/hudson.tasks.Maven_MavenInstallation/Maven_3.6.3/bin/mvn Using `mvn` from path: /home/jenkins/tools/hudson.tasks.Maven_MavenInstallation/Maven_3.6.3/bin/mvn Performing Maven install for hadoop-2.7-hive-1.2 Using `mvn` from path: /home/jenkins/tools/hudson.tasks.Maven_MavenInstallation/Maven_3.6.3/bin/mvn [ERROR] Failed to execute goal org.apache.maven.plugins:maven-install-plugin:3.0.0-M1:install (default-cli) on project spark-yarn_2.12: ArtifactInstallerException: Failed to install metadata org.apache.spark:spark-yarn_2.12/maven-metadata.xml: Could not parse metadata /home/jenkins/.m2/repository/org/apache/spark/spark-yarn_2.12/maven-metadata-local.xml: in epilog non whitespace content is not allowed but got t (position: END_TAG seen ...\nt... @13:2) -> [Help 1] {noformat} > Investigate AmpLab Jenkins server network issue > --- > > Key: SPARK-31693 > URL: https://issues.apache.org/jira/browse/SPARK-31693 > Project: Spark > Issue Type: Bug > Components: Project Infra >Affects Versions: 2.4.6, 3.0.0, 3.1.0 >Reporter: Dongjoon Hyun >Assignee: Shane Knapp >Priority: Critical > > Given the series of failures in Spark packaging Jenkins job, it seems that > there is a network issue in AmbLab Jenkins cluster. > - > https://amplab.cs.berkeley.edu/jenkins/view/Spark%20Packaging/job/spark-master-maven-snapshots/ > - The node failed to talk to GitBox. (SPARK-31687) -> GitHub is okay. > - The node failed to download the maven mirror. (SPARK-31691) -> The primary > host is okay. > - The node failed to communicate repository.apache.org. (Current master > branch Jenkins job failure) > {code} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-deploy-plugin:3.0.0-M1:deploy (default-deploy) > on project spark-parent_2.12: ArtifactDeployerException: Failed to retrieve > remote metadata > org.apache.spark:spark-parent_2.12:3.1.0-SNAPSHOT/maven-metadata.xml: Could > not transfer metadata > org.apache.spark:spark-parent_2.12:3.1.0-SNAPSHOT/maven-metadata.xml from/to > apache.snapshots.https > (https://repository.apache.org/content/repositories/snapshots): Transfer > failed for > https://repository.apache.org/content/repositories/snapshots/org/apache/spark/spark-parent_2.12/3.1.0-SNAPSHOT/maven-metadata.xml: > Connect to repository.apache.org:443 [repository.apache.org/207.244.88.140] > failed: Connection timed out (Connection timed out) -> [Help 1] > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32054) Flaky test: org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.Fallback Parquet V2 to V1
[ https://issues.apache.org/jira/browse/SPARK-32054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163976#comment-17163976 ] Wing Yew Poon commented on SPARK-32054: --- org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.Fallback Parquet V2 to V1 failed in https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126425; however, earlier, it passed in https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126354/ for the same PR (no changes between the runs). > Flaky test: > org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.Fallback Parquet > V2 to V1 > -- > > Key: SPARK-32054 > URL: https://issues.apache.org/jira/browse/SPARK-32054 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 3.1.0 >Reporter: Gabor Somogyi >Priority: Major > > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124364/testReport/org.apache.spark.sql.connector/FileDataSourceV2FallBackSuite/Fallback_Parquet_V2_to_V1/ > {code:java} > Error Message > org.scalatest.exceptions.TestFailedException: > ArrayBuffer((collect,Relation[id#387495L] parquet ), > (save,InsertIntoHadoopFsRelationCommand > file:/home/jenkins/workspace/SparkPullRequestBuilder@3/target/tmp/spark-fe4d8028-b7c5-406d-9c5a-59c96e98f776, > false, Parquet, Map(path -> > /home/jenkins/workspace/SparkPullRequestBuilder@3/target/tmp/spark-fe4d8028-b7c5-406d-9c5a-59c96e98f776), > ErrorIfExists, [id] +- Range (0, 10, step=1, splits=Some(2)) )) had length 2 > instead of expected length 1 > Stacktrace > sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: > ArrayBuffer((collect,Relation[id#387495L] parquet > ), (save,InsertIntoHadoopFsRelationCommand > file:/home/jenkins/workspace/SparkPullRequestBuilder@3/target/tmp/spark-fe4d8028-b7c5-406d-9c5a-59c96e98f776, > false, Parquet, Map(path -> > /home/jenkins/workspace/SparkPullRequestBuilder@3/target/tmp/spark-fe4d8028-b7c5-406d-9c5a-59c96e98f776), > ErrorIfExists, [id] > +- Range (0, 10, step=1, splits=Some(2)) > )) had length 2 instead of expected length 1 > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560) > at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:503) > at > org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.$anonfun$new$22(FileDataSourceV2FallBackSuite.scala:180) > at > org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.$anonfun$new$22$adapted(FileDataSourceV2FallBackSuite.scala:176) > at > org.apache.spark.sql.catalyst.plans.SQLHelper.withTempPath(SQLHelper.scala:69) > at > org.apache.spark.sql.catalyst.plans.SQLHelper.withTempPath$(SQLHelper.scala:66) > at org.apache.spark.sql.QueryTest.withTempPath(QueryTest.scala:34) > at > org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.$anonfun$new$21(FileDataSourceV2FallBackSuite.scala:176) > at > org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54) > at > org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38) > at > org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(FileDataSourceV2FallBackSuite.scala:85) > at > org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:246) > at > org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:244) > at > org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.withSQLConf(FileDataSourceV2FallBackSuite.scala:85) > at > org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.$anonfun$new$20(FileDataSourceV2FallBackSuite.scala:158) > at > org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.$anonfun$new$20$adapted(FileDataSourceV2FallBackSuite.scala:157) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.$anonfun$new$19(FileDataSourceV2FallBackSuite.scala:157) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > at org.scalatest.Transformer.apply(Transformer.scala:22) > at org.scalatest.Transformer.apply(Transformer.scala:20) > at
[jira] [Updated] (SPARK-32003) Shuffle files for lost executor are not unregistered if fetch failure occurs after executor is lost
[ https://issues.apache.org/jira/browse/SPARK-32003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wing Yew Poon updated SPARK-32003: -- Affects Version/s: 3.0.0 > Shuffle files for lost executor are not unregistered if fetch failure occurs > after executor is lost > --- > > Key: SPARK-32003 > URL: https://issues.apache.org/jira/browse/SPARK-32003 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.4.6, 3.0.0 >Reporter: Wing Yew Poon >Priority: Major > > A customer's cluster has a node that goes down while a Spark application is > running. (They are running Spark on YARN with the external shuffle service > enabled.) An executor is lost (apparently the only one running on the node). > This executor lost event is handled in the DAGScheduler, which removes the > executor from its BlockManagerMaster. At this point, there is no > unregistering of shuffle files for the executor or the node. Soon after, > tasks trying to fetch shuffle files output by that executor fail with > FetchFailed (because the node is down, there is no NodeManager available to > serve shuffle files). By right, such fetch failures should cause the shuffle > files for the executor to be unregistered, but they do not. > Due to task failure, the stage is re-attempted. Tasks continue to fail due to > fetch failure form the lost executor's shuffle output. This time, since the > failed epoch for the executor is higher, the executor is removed again (this > doesn't really do anything, the executor was already removed when it was > lost) and this time the shuffle output is unregistered. > So it takes two stage attempts instead of one to clear the shuffle output. We > get 4 attempts by default. The customer was unlucky and two nodes went down > during the stage, i.e., the same problem happened twice. So they used up 4 > stage attempts and the stage failed and thus the job. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32003) Shuffle files for lost executor are not unregistered if fetch failure occurs after executor is lost
[ https://issues.apache.org/jira/browse/SPARK-32003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17136792#comment-17136792 ] Wing Yew Poon commented on SPARK-32003: --- I will open a PR soon with a solution. > Shuffle files for lost executor are not unregistered if fetch failure occurs > after executor is lost > --- > > Key: SPARK-32003 > URL: https://issues.apache.org/jira/browse/SPARK-32003 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.4.6 >Reporter: Wing Yew Poon >Priority: Major > > A customer's cluster has a node that goes down while a Spark application is > running. (They are running Spark on YARN with the external shuffle service > enabled.) An executor is lost (apparently the only one running on the node). > This executor lost event is handled in the DAGScheduler, which removes the > executor from its BlockManagerMaster. At this point, there is no > unregistering of shuffle files for the executor or the node. Soon after, > tasks trying to fetch shuffle files output by that executor fail with > FetchFailed (because the node is down, there is no NodeManager available to > serve shuffle files). By right, such fetch failures should cause the shuffle > files for the executor to be unregistered, but they do not. > Due to task failure, the stage is re-attempted. Tasks continue to fail due to > fetch failure form the lost executor's shuffle output. This time, since the > failed epoch for the executor is higher, the executor is removed again (this > doesn't really do anything, the executor was already removed when it was > lost) and this time the shuffle output is unregistered. > So it takes two stage attempts instead of one to clear the shuffle output. We > get 4 attempts by default. The customer was unlucky and two nodes went down > during the stage, i.e., the same problem happened twice. So they used up 4 > stage attempts and the stage failed and thus the job. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32003) Shuffle files for lost executor are not unregistered if fetch failure occurs after executor is lost
Wing Yew Poon created SPARK-32003: - Summary: Shuffle files for lost executor are not unregistered if fetch failure occurs after executor is lost Key: SPARK-32003 URL: https://issues.apache.org/jira/browse/SPARK-32003 Project: Spark Issue Type: Bug Components: Scheduler Affects Versions: 2.4.6 Reporter: Wing Yew Poon A customer's cluster has a node that goes down while a Spark application is running. (They are running Spark on YARN with the external shuffle service enabled.) An executor is lost (apparently the only one running on the node). This executor lost event is handled in the DAGScheduler, which removes the executor from its BlockManagerMaster. At this point, there is no unregistering of shuffle files for the executor or the node. Soon after, tasks trying to fetch shuffle files output by that executor fail with FetchFailed (because the node is down, there is no NodeManager available to serve shuffle files). By right, such fetch failures should cause the shuffle files for the executor to be unregistered, but they do not. Due to task failure, the stage is re-attempted. Tasks continue to fail due to fetch failure form the lost executor's shuffle output. This time, since the failed epoch for the executor is higher, the executor is removed again (this doesn't really do anything, the executor was already removed when it was lost) and this time the shuffle output is unregistered. So it takes two stage attempts instead of one to clear the shuffle output. We get 4 attempts by default. The customer was unlucky and two nodes went down during the stage, i.e., the same problem happened twice. So they used up 4 stage attempts and the stage failed and thus the job. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28626) Spark leaves unencrypted data on local disk, even with encryption turned on (CVE-2019-10099)
[ https://issues.apache.org/jira/browse/SPARK-28626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17062230#comment-17062230 ] Wing Yew Poon commented on SPARK-28626: --- For the record, to assist folks who need to backport this: >From branch-2.3, we also need >[https://github.com/apache/spark/commit/323dc3ad02e63a7c99b5bd6da618d6020657ecba] [PYSPARK] Update py4j to version 0.10.7. For the SPARKR change, there is a preceding change that is needed [https://github.com/apache/spark/commit/dad5c48b2a229bf6f9e6b8548f9335f04a15c818] [MINOR][PYTHON] Use a helper in `PythonUtils` instead of direct accessing Scala package > Spark leaves unencrypted data on local disk, even with encryption turned on > (CVE-2019-10099) > > > Key: SPARK-28626 > URL: https://issues.apache.org/jira/browse/SPARK-28626 > Project: Spark > Issue Type: Bug > Components: Security >Affects Versions: 2.3.2 >Reporter: Imran Rashid >Priority: Major > Fix For: 2.3.3, 2.4.0 > > > Severity: Important > > Vendor: The Apache Software Foundation > > Versions affected: > All Spark 1.x, Spark 2.0.x, Spark 2.1.x, and 2.2.x versions > Spark 2.3.0 to 2.3.2 > > Description: > Prior to Spark 2.3.3, in certain situations Spark would write user data to > local disk unencrypted, even if spark.io.encryption.enabled=true. This > includes cached blocks that are fetched to disk (controlled by > spark.maxRemoteBlockSizeFetchToMem); in SparkR, using parallelize; in > Pyspark, using broadcast and parallelize; and use of python udfs. > > > Mitigation: > 1.x, 2.0.x, 2.1.x, 2.2.x, 2.3.x users should upgrade to 2.3.3 or newer, > including 2.4.x > > Credit: > This issue was reported by Thomas Graves of NVIDIA. > > References: > [https://spark.apache.org/security.html] > > The following commits were used to fix this issue, in branch-2.3 (there may > be other commits in master / branch-2.4, that are equivalent.) > {noformat} > commit 575fea120e25249716e3f680396580c5f9e26b5b > Author: Imran Rashid > Date: Wed Aug 22 16:38:28 2018 -0500 > [CORE] Updates to remote cache reads > Covered by tests in DistributedSuite > > commit 6d742d1bd71aa3803dce91a830b37284cb18cf70 > Author: Imran Rashid > Date: Thu Sep 6 12:11:47 2018 -0500 > [PYSPARK][SQL] Updates to RowQueue > Tested with updates to RowQueueSuite > > commit 09dd34cb1706f2477a89174d6a1a0f17ed5b0a65 > Author: Imran Rashid > Date: Mon Aug 13 21:35:34 2018 -0500 > [PYSPARK] Updates to pyspark broadcast > > commit 12717ba0edfa5459c9ac2085f46b1ecc0ee759aa > Author: hyukjinkwon > Date: Mon Sep 24 19:25:02 2018 +0800 > [SPARKR] Match pyspark features in SparkR communication protocol > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-17398) Failed to query on external JSon Partitioned table
[ https://issues.apache.org/jira/browse/SPARK-17398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wing Yew Poon reopened SPARK-17398: --- This issue was never actually fixed. Evidently the problem still exists. I'll create a PR with a fix. > Failed to query on external JSon Partitioned table > -- > > Key: SPARK-17398 > URL: https://issues.apache.org/jira/browse/SPARK-17398 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: pin_zhang >Priority: Major > Fix For: 2.0.1 > > Attachments: screenshot-1.png > > > 1. Create External Json partitioned table > with SerDe in hive-hcatalog-core-1.2.1.jar, download fom > https://mvnrepository.com/artifact/org.apache.hive.hcatalog/hive-hcatalog-core/1.2.1 > 2. Query table meet exception, which works in spark1.5.2 > Exception in thread "main" org.apache.spark.SparkException: Job aborted due > to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: > Lost task > 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: > java.util.ArrayList cannot be cast to org.apache.hive.hcatalog.data.HCatRecord > at > org.apache.hive.hcatalog.data.HCatRecordObjectInspector.getStructFieldData(HCatRecordObjectInspector.java:45) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:430) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:426) > > 3. Test Code > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.sql.hive.HiveContext > object JsonBugs { > def main(args: Array[String]): Unit = { > val table = "test_json" > val location = "file:///g:/home/test/json" > val create = s"""CREATE EXTERNAL TABLE ${table} > (id string, seq string ) > PARTITIONED BY(index int) > ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' > LOCATION "${location}" > """ > val add_part = s""" > ALTER TABLE ${table} ADD > PARTITION (index=1)LOCATION '${location}/index=1' > """ > val conf = new SparkConf().setAppName("scala").setMaster("local[2]") > conf.set("spark.sql.warehouse.dir", "file:///g:/home/warehouse") > val ctx = new SparkContext(conf) > val hctx = new HiveContext(ctx) > val exist = hctx.tableNames().map { x => x.toLowerCase() }.contains(table) > if (!exist) { > hctx.sql(create) > hctx.sql(add_part) > } else { > hctx.sql("show partitions " + table).show() > } > hctx.sql("select * from test_json").show() > } > } -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28770) Flaky Tests: Test ReplayListenerSuite.End-to-end replay with compression failed
[ https://issues.apache.org/jira/browse/SPARK-28770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16922925#comment-16922925 ] Wing Yew Poon commented on SPARK-28770: --- As [~irashid] noted, following an offline discussion, we agree with [~kabhwan] that the existing implementation of EventMonster, extending EventLoggingListener, does not serve the purposes of ReplayListenerSuite. I have updated my PR to fix EventMonster instead of adding filtering logic in testApplicationReplay. > Flaky Tests: Test ReplayListenerSuite.End-to-end replay with compression > failed > --- > > Key: SPARK-28770 > URL: https://issues.apache.org/jira/browse/SPARK-28770 > Project: Spark > Issue Type: Test > Components: Spark Core >Affects Versions: 2.4.3 > Environment: Community jenkins and our arm testing instance. >Reporter: huangtianhua >Priority: Major > > Test > org.apache.spark.scheduler.ReplayListenerSuite.End-to-end replay with > compression is failed see > [https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2/267/testReport/junit/org.apache.spark.scheduler/ReplayListenerSuite/End_to_end_replay_with_compression/] > > And also the test is failed on arm instance, I sent email to spark-dev > before, and we suspect there is something related with the commit > [https://github.com/apache/spark/pull/23767], we tried to revert it and the > tests are passed: > ReplayListenerSuite: > - ... > - End-to-end replay *** FAILED *** > "[driver]" did not equal "[1]" (JsonProtocolSuite.scala:622) > - End-to-end replay with compression *** FAILED *** > "[driver]" did not equal "[1]" (JsonProtocolSuite.scala:622) > > Not sure what's wrong, hope someone can help to figure it out, thanks very > much. -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28770) Flaky Tests: Test ReplayListenerSuite.End-to-end replay with compression failed
[ https://issues.apache.org/jira/browse/SPARK-28770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921955#comment-16921955 ] Wing Yew Poon commented on SPARK-28770: --- My earlier analysis of why the test failure occurs was deficient. The real reason is that ReplayListenerSuite.testApplicationReplay fails if the application runs long enough for the driver to send an executor metrics update. This causes stage executor metrics to be written for the driver. However, executor metrics updates are not logged, and thus not replayed. Therefore no stage executor metrics for the driver is logged on replay. In this scenario, there will be more events in the original than logged by the replay listener, and the events won't line up. I was able to engineer this failure by reducing the heartbeat interval at which the driver sends executor metrics updates. > Flaky Tests: Test ReplayListenerSuite.End-to-end replay with compression > failed > --- > > Key: SPARK-28770 > URL: https://issues.apache.org/jira/browse/SPARK-28770 > Project: Spark > Issue Type: Test > Components: Spark Core >Affects Versions: 2.4.3 > Environment: Community jenkins and our arm testing instance. >Reporter: huangtianhua >Priority: Major > > Test > org.apache.spark.scheduler.ReplayListenerSuite.End-to-end replay with > compression is failed see > [https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2/267/testReport/junit/org.apache.spark.scheduler/ReplayListenerSuite/End_to_end_replay_with_compression/] > > And also the test is failed on arm instance, I sent email to spark-dev > before, and we suspect there is something related with the commit > [https://github.com/apache/spark/pull/23767], we tried to revert it and the > tests are passed: > ReplayListenerSuite: > - ... > - End-to-end replay *** FAILED *** > "[driver]" did not equal "[1]" (JsonProtocolSuite.scala:622) > - End-to-end replay with compression *** FAILED *** > "[driver]" did not equal "[1]" (JsonProtocolSuite.scala:622) > > Not sure what's wrong, hope someone can help to figure it out, thanks very > much. -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28770) Flaky Tests: Test ReplayListenerSuite.End-to-end replay with compression failed
[ https://issues.apache.org/jira/browse/SPARK-28770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16920234#comment-16920234 ] Wing Yew Poon commented on SPARK-28770: --- I looked into the issue further. In EventLoggingListener, almost all calls to logEvent (to write serialized JSON to the event log) are as a direct result of an onXXX method being called. The exception is that within onStageCompleted, before calling logEvent with the SparkListenerStageCompleted event, if we are logging stage executor metrics, there is a bulk call to logEvent with SparkListenerStageExecutorMetrics events via a Map.foreach. This Map.foreach bulk operation may not log the events with the same order. This is also the only place where SparkListenerStageExecutorMetrics events get logged. For this reason, I think the affected tests ("End-to-end replay" and "End-to-end replay with compression", both implemented by calling testApplicationReplay) should not compare the SparkListenerStageExecutorMetrics events. That should eliminate the indeterminacy of the tests. > Flaky Tests: Test ReplayListenerSuite.End-to-end replay with compression > failed > --- > > Key: SPARK-28770 > URL: https://issues.apache.org/jira/browse/SPARK-28770 > Project: Spark > Issue Type: Test > Components: Spark Core >Affects Versions: 2.4.3 > Environment: Community jenkins and our arm testing instance. >Reporter: huangtianhua >Priority: Major > > Test > org.apache.spark.scheduler.ReplayListenerSuite.End-to-end replay with > compression is failed see > [https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2/267/testReport/junit/org.apache.spark.scheduler/ReplayListenerSuite/End_to_end_replay_with_compression/] > > And also the test is failed on arm instance, I sent email to spark-dev > before, and we suspect there is something related with the commit > [https://github.com/apache/spark/pull/23767], we tried to revert it and the > tests are passed: > ReplayListenerSuite: > - ... > - End-to-end replay *** FAILED *** > "[driver]" did not equal "[1]" (JsonProtocolSuite.scala:622) > - End-to-end replay with compression *** FAILED *** > "[driver]" did not equal "[1]" (JsonProtocolSuite.scala:622) > > Not sure what's wrong, hope someone can help to figure it out, thanks very > much. -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28770) Flaky Tests: Test ReplayListenerSuite.End-to-end replay with compression failed
[ https://issues.apache.org/jira/browse/SPARK-28770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16920212#comment-16920212 ] Wing Yew Poon commented on SPARK-28770: --- On my branch from which [https://github.com/apache/spark/pull/23767] was merged into master, I modified ReplayListenerSuite following [https://gist.github.com/dwickern/6ba9c5c505d2325d3737ace059302922], and ran "End-to-end replay with compression" 100 times. I encountered no failures. I ran this on my MacBook Pro. The instance of failure that Jungtaek cited appears to be due to a comparison of two SparkListenerStageExecutorMetrics events (one from the original, the other from the replay) failing. One event came from the driver and the other came from executor "1". SparkListenerStageExecutorMetrics events are logged at stage completion if spark.eventLog.logStageExecutorMetrics.enabled is set to true. The failure could be due to these events being in a different order in the replay than in the original. In the commit that first introduced these events, in ReplayListenerSuite, there was some code to filter out these events in the testApplicationReplay method of ReplayListenerSuite. (The code was to filter out the events from the original, not from the replay, which I didn't understand.) Maybe we could filter out the SparkListenerStageExecutorMetrics events (from both original and replay) in testApplicationReplay (which is called by "End-to-end replay" and "End-to-end replay with compression"), to avoid this flakiness. > Flaky Tests: Test ReplayListenerSuite.End-to-end replay with compression > failed > --- > > Key: SPARK-28770 > URL: https://issues.apache.org/jira/browse/SPARK-28770 > Project: Spark > Issue Type: Test > Components: Spark Core >Affects Versions: 2.4.3 > Environment: Community jenkins and our arm testing instance. >Reporter: huangtianhua >Priority: Major > > Test > org.apache.spark.scheduler.ReplayListenerSuite.End-to-end replay with > compression is failed see > [https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-3.2/267/testReport/junit/org.apache.spark.scheduler/ReplayListenerSuite/End_to_end_replay_with_compression/] > > And also the test is failed on arm instance, I sent email to spark-dev > before, and we suspect there is something related with the commit > [https://github.com/apache/spark/pull/23767], we tried to revert it and the > tests are passed: > ReplayListenerSuite: > - ... > - End-to-end replay *** FAILED *** > "[driver]" did not equal "[1]" (JsonProtocolSuite.scala:622) > - End-to-end replay with compression *** FAILED *** > "[driver]" did not equal "[1]" (JsonProtocolSuite.scala:622) > > Not sure what's wrong, hope someone can help to figure it out, thanks very > much. -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27324) document configurations related to executor metrics
[ https://issues.apache.org/jira/browse/SPARK-27324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16806297#comment-16806297 ] Wing Yew Poon commented on SPARK-27324: --- I'd hold off on this until SPARK-26329 is resolved. > document configurations related to executor metrics > --- > > Key: SPARK-27324 > URL: https://issues.apache.org/jira/browse/SPARK-27324 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 3.0.0 >Reporter: Wing Yew Poon >Priority: Major > > SPARK-23429 introduced executor memory metrics, and the configuration, > spark.eventLog.logStageExecutorMetrics.enabled, that determines if per-stage > per-executor metric peaks get written to the event log. (The metrics are > polled and sent in the heartbeat, and this is always done; the configuration > is only to determine if aggregated metric peaks are written to the event log.) > SPARK-24958 added proc fs based metrics to the executor memory metrics, and > the configuration, spark.eventLog.logStageExecutorProcessTreeMetrics.enabled, > to determine if these additional (more expensive) metrics are collected when > metrics are polled. > SPARK-26329 will introduce a configuration, > spark.executor.metrics.pollingInterval, to allow polling at more frequent > intervals than the executor heartbeat. > These configurations and how they relate to each other should be documented > in the Configuration page. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27324) document configurations related to executor metrics
[ https://issues.apache.org/jira/browse/SPARK-27324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wing Yew Poon updated SPARK-27324: -- Description: SPARK-23429 introduced executor memory metrics, and the configuration, spark.eventLog.logStageExecutorMetrics.enabled, that determines if per-stage per-executor metric peaks get written to the event log. (The metrics are polled and sent in the heartbeat, and this is always done; the configuration is only to determine if aggregated metric peaks are written to the event log.) SPARK-24958 added proc fs based metrics to the executor memory metrics, and the configuration, spark.eventLog.logStageExecutorProcessTreeMetrics.enabled, to determine if these additional (more expensive) metrics are collected when metrics are polled. SPARK-26329 will introduce a configuration, spark.executor.metrics.pollingInterval, to allow polling at more frequent intervals than the executor heartbeat. These configurations and how they relate to each other should be documented in the Configuration page. was: SPARK-23429 introduced executor memory metrics, and the configuration, spark.eventLog.logStageExecutorMetrics.enabled, that determines if per-stage per-executor metric peaks get written to the event log. (The metrics are polled and sent in the heartbeat, and this is always done; the configuration is only to determine if aggregated metric peaks are written to the event log.) SPARK-24958 added proc fs based metrics to the executor memory metrics, and the configuration, spark.eventLog.logStageExecutorProcessTreeMetrics.enabled, to determine if these additional (more expensive) metrics are collected when metrics are polled. SPARK-26329 will introduce a configuration, spark.executor.metrics.pollingInterval, to allow polling at more frequent intervals than the executor heartbeat. These configurations and how to relate to each other should be documented in the Configuration page. > document configurations related to executor metrics > --- > > Key: SPARK-27324 > URL: https://issues.apache.org/jira/browse/SPARK-27324 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 3.0.0 >Reporter: Wing Yew Poon >Priority: Major > > SPARK-23429 introduced executor memory metrics, and the configuration, > spark.eventLog.logStageExecutorMetrics.enabled, that determines if per-stage > per-executor metric peaks get written to the event log. (The metrics are > polled and sent in the heartbeat, and this is always done; the configuration > is only to determine if aggregated metric peaks are written to the event log.) > SPARK-24958 added proc fs based metrics to the executor memory metrics, and > the configuration, spark.eventLog.logStageExecutorProcessTreeMetrics.enabled, > to determine if these additional (more expensive) metrics are collected when > metrics are polled. > SPARK-26329 will introduce a configuration, > spark.executor.metrics.pollingInterval, to allow polling at more frequent > intervals than the executor heartbeat. > These configurations and how they relate to each other should be documented > in the Configuration page. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27324) document configurations related to executor metrics
[ https://issues.apache.org/jira/browse/SPARK-27324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wing Yew Poon updated SPARK-27324: -- Description: SPARK-23429 introduced executor memory metrics, and the configuration, spark.eventLog.logStageExecutorMetrics.enabled, that determines if per-stage per-executor metric peaks get written to the event log. (The metrics are polled and sent in the heartbeat, and this is always done; the configuration is only to determine if aggregated metric peaks are written to the event log.) SPARK-24958 added proc fs based metrics to the executor memory metrics, and the configuration, spark.eventLog.logStageExecutorProcessTreeMetrics.enabled, to determine if these additional (more expensive) metrics are collected when metrics are polled. SPARK-26329 will introduce a configuration, spark.executor.metrics.pollingInterval, to allow polling at more frequent intervals than the executor heartbeat. These configurations and how to relate to each other should be documented in the Configuration page. was: SPARK-23429 introduced executor memory metrics, and the configuration, spark.eventLog.logStageExecutorMetrics.enabled, that determines if per-stage per-executor metric peaks get written to the event log. (The metrics are polled and sent in the heartbeat, and this is always done; the configuration is only to determine if aggregated metric peaks are written to the event log.) SPARK-26357 added proc fs based metrics to the executor memory metrics, and the configuration, spark.eventLog.logStageExecutorProcessTreeMetrics.enabled, to determine if these additional (more expensive) metrics are collected when metrics are polled. SPARK-26329 will introduce a configuration, spark.executor.metrics.pollingInterval, to allow polling at more frequent intervals than the executor heartbeat. These configurations and how to relate to each other should be documented in the Configuration page. > document configurations related to executor metrics > --- > > Key: SPARK-27324 > URL: https://issues.apache.org/jira/browse/SPARK-27324 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 3.0.0 >Reporter: Wing Yew Poon >Priority: Major > > SPARK-23429 introduced executor memory metrics, and the configuration, > spark.eventLog.logStageExecutorMetrics.enabled, that determines if per-stage > per-executor metric peaks get written to the event log. (The metrics are > polled and sent in the heartbeat, and this is always done; the configuration > is only to determine if aggregated metric peaks are written to the event log.) > SPARK-24958 added proc fs based metrics to the executor memory metrics, and > the configuration, spark.eventLog.logStageExecutorProcessTreeMetrics.enabled, > to determine if these additional (more expensive) metrics are collected when > metrics are polled. > SPARK-26329 will introduce a configuration, > spark.executor.metrics.pollingInterval, to allow polling at more frequent > intervals than the executor heartbeat. > These configurations and how to relate to each other should be documented in > the Configuration page. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27324) document configurations related to executor metrics
Wing Yew Poon created SPARK-27324: - Summary: document configurations related to executor metrics Key: SPARK-27324 URL: https://issues.apache.org/jira/browse/SPARK-27324 Project: Spark Issue Type: Improvement Components: Documentation Affects Versions: 3.0.0 Reporter: Wing Yew Poon SPARK-23429 introduced executor memory metrics, and the configuration, spark.eventLog.logStageExecutorMetrics.enabled, that determines if per-stage per-executor metric peaks get written to the event log. (The metrics are polled and sent in the heartbeat, and this is always done; the configuration is only to determine if aggregated metric peaks are written to the event log.) SPARK-26357 added proc fs based metrics to the executor memory metrics, and the configuration, spark.eventLog.logStageExecutorProcessTreeMetrics.enabled, to determine if these additional (more expensive) metrics are collected when metrics are polled. SPARK-26329 will introduce a configuration, spark.executor.metrics.pollingInterval, to allow polling at more frequent intervals than the executor heartbeat. These configurations and how to relate to each other should be documented in the Configuration page. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26329) ExecutorMetrics should poll faster than heartbeats
[ https://issues.apache.org/jira/browse/SPARK-26329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745313#comment-16745313 ] Wing Yew Poon commented on SPARK-26329: --- I am working on this. In the executor, we can track what stages have a positive number of running tasks (when a task starts, we can get its stage id and attempt number). When polling the executor memory metrics, we attribute the memory to the active stage(s), and update the peaks. In a heartbeat, we send the per-stage peaks (for stages active at that time), and then reset the peaks. The semantics would be that the per-stage peaks sent in each heartbeat are the peaks since the last heartbeat. In addition, we keep a map of task ids to metrics (for running tasks), which tracks the peaks of the metrics and the polling thread updates this as well. At task end, we send the peak values associated with the task in the task result. These are the peak values of the executor metrics during the lifetime of the task. (Of course, this does not mean that that task alone contributed to those peaks, only that those were the peak memory values seen while that task was running.) If between heartbeats, a stage completes, so there are no more running tasks for that stage, then in the next heartbeat, there are no metrics sent for that stage; however, at the end of a task that belonged to that stage, the metrics would have been sent in the task result, so we do not lose those peaks. We continue to do the stage-level aggregation in the EventLoggingListener. For the driver, I do not plan to poll more frequently. I think this is ok since most memory issues are with executors rather than with the driver. We will still poll when the driver heartbeats. What the driver sends will be the current values of the metrics in the driver at the time of the heartbeat. This is semantically the same as before. > ExecutorMetrics should poll faster than heartbeats > -- > > Key: SPARK-26329 > URL: https://issues.apache.org/jira/browse/SPARK-26329 > Project: Spark > Issue Type: Improvement > Components: Spark Core, Web UI >Affects Versions: 3.0.0 >Reporter: Imran Rashid >Priority: Major > > We should allow faster polling of the executor memory metrics (SPARK-23429 / > SPARK-23206) without requiring a faster heartbeat rate. We've seen the > memory usage of executors pike over 1 GB in less than a second, but > heartbeats are only every 10 seconds (by default). Spark needs to enable > fast polling to capture these peaks, without causing too much strain on the > system. > In the current implementation, the metrics are polled along with the > heartbeat, but this leads to a slow rate of polling metrics by default. If > users were to increase the rate of the heartbeat, they risk overloading the > driver on a large cluster, with too many messages and too much work to > aggregate the metrics. But, the executor could poll the metrics more > frequently, and still only send the *max* since the last heartbeat for each > metric. This keeps the load on the driver the same, and only introduces a > small overhead on the executor to grab the metrics and keep the max. > The downside of this approach is that we still need to wait for the next > heartbeat for the driver to be aware of the new peak. If the executor dies > or is killed before then, then we won't find out. A potential future > enhancement would be to send an update *anytime* there is an increase by some > percentage, but we'll leave that out for now. > Another possibility would be to change the metrics themselves to track peaks > for us, so we don't have to fine-tune the polling rate. For example, some > jvm metrics provide a usage threshold, and notification: > https://docs.oracle.com/javase/7/docs/api/java/lang/management/MemoryPoolMXBean.html#UsageThreshold > But, that is not available on all metrics. This proposal gives us a generic > way to get a more accurate peak memory usage for *all* metrics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22403) StructuredKafkaWordCount example fails in YARN cluster mode
[ https://issues.apache.org/jira/browse/SPARK-22403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16227298#comment-16227298 ] Wing Yew Poon commented on SPARK-22403: --- I realize that in a production application, one would set checkpointLocation and avoid this issue. However, there is evidently a problem in the code that handles the case when checkpointLocation is not set and a temporary checkpoint location is created. Also, the StructuredKafkaWordCount example does not accept a parameter for setting the checkpointLocation. > StructuredKafkaWordCount example fails in YARN cluster mode > --- > > Key: SPARK-22403 > URL: https://issues.apache.org/jira/browse/SPARK-22403 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Wing Yew Poon > > When I run the StructuredKafkaWordCount example in YARN client mode, it runs > fine. However, when I run it in YARN cluster mode, the application errors > during initialization, and dies after the default number of YARN application > attempts. In the AM log, I see > {noformat} > 17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value > AS STRING) > 17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to > /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_01/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata > org.apache.hadoop.security.AccessControlException: Permission denied: > user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785) > at > org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257) > ... > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960) > at > org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:114) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala) > {noformat} > Looking at
[jira] [Updated] (SPARK-22403) StructuredKafkaWordCount example fails in YARN cluster mode
[ https://issues.apache.org/jira/browse/SPARK-22403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wing Yew Poon updated SPARK-22403: -- Description: When I run the StructuredKafkaWordCount example in YARN client mode, it runs fine. However, when I run it in YARN cluster mode, the application errors during initialization, and dies after the default number of YARN application attempts. In the AM log, I see {noformat} 17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value AS STRING) 17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_01/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata org.apache.hadoop.security.AccessControlException: Permission denied: user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257) ... at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960) at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:114) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79) at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala) {noformat} Looking at StreamingQueryManager#createQuery, we have https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L198 {code} val checkpointLocation = userSpecifiedCheckpointLocation.map { ... ... }.orElse { ... }.getOrElse { if (useTempCheckpointLocation) { // Delete the temp checkpoint when a query is being stopped without errors. deleteCheckpointOnStop = true Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath } else { ... } } {code} And Utils.createTempDir has {code} def createTempDir( root: String = System.getProperty("java.io.tmpdir"), namePrefix: String = "spark"): File = { val dir = createDirectory(root, namePrefix) ShutdownHookManager.registerShutdownDeleteDir(dir) dir } {code} In
[jira] [Updated] (SPARK-22403) StructuredKafkaWordCount example fails in YARN cluster mode
[ https://issues.apache.org/jira/browse/SPARK-22403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wing Yew Poon updated SPARK-22403: -- Description: When I run the StructuredKafkaWordCount example in YARN client mode, it runs fine. However, when I run it in YARN cluster mode, the application errors during initialization, and dies after the default number of YARN application attempts. In the AM log, I see {noformat} 17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value AS STRING) 17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_01/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata org.apache.hadoop.security.AccessControlException: Permission denied: user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257) ... at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960) at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:114) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79) at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala) {noformat} Looking at StreamingQueryManager#createQuery, we have https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L198 {code} val checkpointLocation = userSpecifiedCheckpointLocation.map { ... ... }.orElse { ... }.getOrElse { if (useTempCheckpointLocation) { // Delete the temp checkpoint when a query is being stopped without errors. deleteCheckpointOnStop = true Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath } else { ... } } {code} And Utils.createTempDir has {code} def createTempDir( root: String = System.getProperty("java.io.tmpdir"), namePrefix: String = "spark"): File = { val dir = createDirectory(root, namePrefix) ShutdownHookManager.registerShutdownDeleteDir(dir) dir } {code} In
[jira] [Commented] (SPARK-22403) StructuredKafkaWordCount example fails in YARN cluster mode
[ https://issues.apache.org/jira/browse/SPARK-22403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225913#comment-16225913 ] Wing Yew Poon commented on SPARK-22403: --- The simplest change that will solve the problem in this particular scenario, is to change the Utils.createTempDir(namePrefix = s"temporary") call in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L208 to Utils.createTempDir(root = "/tmp", namePrefix = "temporary"). In my view, using "/tmp" is not worse, in fact is better, than using System.getProperty("java.io.tmpdir"). However, others may know of better solutions. > StructuredKafkaWordCount example fails in YARN cluster mode > --- > > Key: SPARK-22403 > URL: https://issues.apache.org/jira/browse/SPARK-22403 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Wing Yew Poon > > When I run the StructuredKafkaWordCount example in YARN client mode, it runs > fine. However, when I run it in YARN cluster mode, the application errors > during initialization, and dies after the default number of YARN application > attempts. In the AM log, I see > {noformat} > 17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value > AS STRING) > 17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream > metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to > /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_01/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata > org.apache.hadoop.security.AccessControlException: Permission denied: > user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785) > at > org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257) > ... > at > org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214) > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458) > at > org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469) > at > org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960) > at > org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:114) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) > at > org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79) > at
[jira] [Created] (SPARK-22403) StructuredKafkaWordCount example fails in YARN cluster mode
Wing Yew Poon created SPARK-22403: - Summary: StructuredKafkaWordCount example fails in YARN cluster mode Key: SPARK-22403 URL: https://issues.apache.org/jira/browse/SPARK-22403 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.2.0 Reporter: Wing Yew Poon When I run the StructuredKafkaWordCount example in YARN client mode, it runs fine. However, when I run it in YARN cluster mode, the application errors during initialization, and dies after the default number of YARN application attempts. In the AM log, I see {noformat} 17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value AS STRING) 17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_01/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata org.apache.hadoop.security.AccessControlException: Permission denied: user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826) at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257) ... at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960) at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:114) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79) at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala) {noformat} Looking at StreamingQueryManager#createQuery, we have https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L198 {code} val checkpointLocation = userSpecifiedCheckpointLocation.map { ... ... }.orElse { ... }.getOrElse { if (useTempCheckpointLocation) { // Delete the temp checkpoint when a query is being stopped without errors. deleteCheckpointOnStop = true Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath } else { ... } } {code} And Utils.createTempDir has {code} def createTempDir( root: String =
[jira] [Commented] (SPARK-6325) YarnAllocator crash with dynamic allocation on
[ https://issues.apache.org/jira/browse/SPARK-6325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14360894#comment-14360894 ] Wing Yew Poon commented on SPARK-6325: -- The testcase actually came from a similar example in a presentation by Andrew Or and Aaron Davidson on elastic scaling. :-) YarnAllocator crash with dynamic allocation on -- Key: SPARK-6325 URL: https://issues.apache.org/jira/browse/SPARK-6325 Project: Spark Issue Type: Bug Components: Spark Core, YARN Affects Versions: 1.3.0 Reporter: Marcelo Vanzin Priority: Critical Run spark-shell like this: {noformat} spark-shell --conf spark.shuffle.service.enabled=true \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.dynamicAllocation.minExecutors=1 \ --conf spark.dynamicAllocation.maxExecutors=20 \ --conf spark.dynamicAllocation.executorIdleTimeout=10 \ --conf spark.dynamicAllocation.schedulerBacklogTimeout=5 \ --conf spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=5 {noformat} Then run this simple test: {code} scala val verySmallRdd = sc.parallelize(1 to 10, 10).map { i = | if (i % 2 == 0) { Thread.sleep(30 * 1000); i } else 0 | } verySmallRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at console:21 scala verySmallRdd.collect() {code} When Spark starts ramping down the number of allocated executors, it will hit an assert in YarnAllocator.scala: {code} assert(targetNumExecutors = 0, Allocator killed more executors than are allocated!) {code} This assert will cause the akka backend to die, but not the AM itself. So the app will be in a zombie-like state, where the driver is alive but can't talk to the AM. Sadness ensues. I have a working fix, just need to add unit tests. Stay tuned. Thanks to [~wypoon] for finding the problem, and for the test case. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5485) typo in spark streaming configuration parameter
Wing Yew Poon created SPARK-5485: Summary: typo in spark streaming configuration parameter Key: SPARK-5485 URL: https://issues.apache.org/jira/browse/SPARK-5485 Project: Spark Issue Type: Bug Components: Documentation Affects Versions: 1.2.0 Reporter: Wing Yew Poon In https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#deploying-applications, under Requirements, the bullet point on Configuring write ahead logs says This can be enabled by setting the configuration parameter spark.streaming.receiver.writeAheadLogs.enable to true. There is an unfortunate typo in the name of the parameter, which I copied-and-pasted into my deployment where I was testing it out and seeing data loss as a result. The same typo occurs in https://spark.apache.org/docs/1.2.0/configuration.html, which is even more unfortunate. Documentation should not have typos like this for configuration parameters. I later found the correct parameter on http://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5485) typo in spark streaming configuration parameter
[ https://issues.apache.org/jira/browse/SPARK-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14297449#comment-14297449 ] Wing Yew Poon commented on SPARK-5485: -- It should be spark.streaming.receiver.writeAheadLog.enable (no 's' in Log). typo in spark streaming configuration parameter --- Key: SPARK-5485 URL: https://issues.apache.org/jira/browse/SPARK-5485 Project: Spark Issue Type: Bug Components: Documentation Affects Versions: 1.2.0 Reporter: Wing Yew Poon In https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#deploying-applications, under Requirements, the bullet point on Configuring write ahead logs says This can be enabled by setting the configuration parameter spark.streaming.receiver.writeAheadLogs.enable to true. There is an unfortunate typo in the name of the parameter, which I copied-and-pasted into my deployment where I was testing it out and seeing data loss as a result. The same typo occurs in https://spark.apache.org/docs/1.2.0/configuration.html, which is even more unfortunate. Documentation should not have typos like this for configuration parameters. I later found the correct parameter on http://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org