GitHub user bellemare opened an issue: https://github.com/apache/incubator-predictionio/issues/337
Yarn support not working due to "," delimiter instead of "=" delimiter in env("SPARK_YARN_USER_ENV") The PIO (trunk) Engine run() function populates the SPARK_YARN_USER_ENV using the following code. Note that the key-values are joined by a comma: https://github.com/apache/incubator-predictionio/blob/e4a3c0c9fc1251d7355d921acb66168226446b3f/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala#L416 ``` "SPARK_YARN_USER_ENV" -> sys.env.filter(kv => kv._1.startsWith("PIO_")). map(kv => s"${kv._1}=${kv._2}").mkString(",")).run()) ``` The spark YarnSparkHadoopUtil within the Spark Project (trunk) is used to read SPARK_YARN_USER_ENV if it is set. Note that it strictly expects a "=" between the Key-Value pairs. https://github.com/apache/spark/blob/81e5619ca141a1d3a06547d2b682cbe3f135b360/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala#L148 ``` def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = { if (inputString != null && inputString.length() > 0) { val childEnvs = inputString.split(",") val p = Pattern.compile(environmentVariableRegex) for (cEnv <- childEnvs) { val parts = cEnv.split("=") // split on '=' val m = p.matcher(parts(1)) val sb = new StringBuffer while (m.find()) { val variable = m.group(1) var replace = "" if (env.get(variable) != None) { replace = env.get(variable).get } else { // if this key is not configured for the child .. get it from the env replace = System.getenv(variable) if (replace == null) { // the env key is note present anywhere .. simply set it replace = "" } } m.appendReplacement(sb, Matcher.quoteReplacement(replace)) } m.appendTail(sb) // This treats the environment variable as path variable delimited by `File.pathSeparator` // This is kept for backward compatibility and consistency with Hadoop's behavior addPathToEnvironment(env, parts(0), sb.toString) } } } ``` The end-result is that when creating a Yarn client, the SPARK_YARN_USER_ENV is not set to the convention expected under the Spark standard. This following portion produces an array with length 1 (since there is no = to split on): ``` val parts = cEnv.split("=") // split on '=' ``` And you end up getting this exception when trying to launch a Spark Yarn client: ``` 2017-01-17 09:56:20,362 INFO main@tools.console.Console$ - Using existing engine manifest JSON at /home/path/to/work/dir/manifest.json 2017-01-17 09:56:20,364 WARN main@tools.console.Template$ - template.json does not exist. Template metadata will not be available. (This is safe to ignore if you are not working on a template.) 2017-01-17 09:56:21,648 INFO main@prediction.tools.Runner$ - Submission command: /opt/spark-current/bin/spark-submit --master yarn --deploy-mode client --driver-memory 5G --conf spark.driver.maxResultSize=2G --conf spark.memory.fraction=0.75 --executor-memory 25G --conf spark.executor.cores=16 --total-executor-cores 45 --conf spark.task.maxFailures=1 --conf spark.ui.retainedJobs=111 --conf spark.ui.retainedStages=1111 --conf spark.eventLog.enabled=true --conf spark.logConf=true --conf spark.executor.extraJavaOptions=-Djava.util.Arrays.useLegacyMergeSort=true --class io.prediction.workflow.CreateWorkflow --jars file:/home/path/to/work/dir/target/scala-2.10/flyer-affinity-assembly-1.20160823.0-deps.jar,file:/home/path/to/work/dir/target/scala-2.10/flyer-affinity_2.10-1.20160823.0.jar --files file:/opt/PredictionIO/conf/log4j.properties,file:/opt/hadoop-current/etc/hadoop/core-site.xml,file:/opt/hbase-current/conf/hbase-site.xml --driver-class-path /opt/PredictionIO/conf:/opt/hadoo p-current/etc/hadoop:/opt/hbase-current/conf file:/opt/PredictionIO/lib/pio-assembly-0.9.5.jar --engine-id qJOA3WjMK8L8II8NqEMcRnAf7id45cDd --engine-version 886f3f5ac890f5d82e6be2d0ca82fe1f4ed7a84a --engine-variant file:/home/path/to/work/dir/engine.json --verbosity 0 --json-extractor Both --env PIO_STORAGE_SOURCES_HBASE_TYPE=hbase,PIO_ENV_LOADED=1,PIO_STORAGE_REPOSITORIES_METADATA_NAME=pio_meta,PIO_FS_BASEDIR=/pio-apps,PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=xx.xx.xx.245,xx.xx.xx.156,xx.xx.xx.236,PIO_HOME=/opt/PredictionIO,PIO_FS_ENGINESDIR=/pio-apps/engines,PIO_STORAGE_SOURCES_HDFS_TYPE=hdfs,PIO_STORAGE_SOURCES_HDFS_PATH=/pio-apps/models,PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch,PIO_STORAGE_REPOSITORIES_METADATA_SOURCE=ELASTICSEARCH,PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE=HDFS,PIO_STORAGE_REPOSITORIES_EVENTDATA_NAME=pio_event,PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=pio-staging-elasticsearch,PIO_FS_TMPDIR=/pio-apps/tmp,PIO_STORAGE_REPOSITORIES_MODELDATA_NAME=pio_ model,PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE=HBASE,PIO_CONF_DIR=/opt/PredictionIO/conf,PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9300 2017-01-17 09:56:22,884 WARN m...@hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2017-01-17 09:56:23,786 INFO main@prediction.controller.Engine - Extracting datasource params... 2017-01-17 09:56:23,856 INFO main@prediction.workflow.WorkflowUtils$ - No 'name' is found. Default empty String will be used. 2017-01-17 09:56:24,255 INFO main@prediction.controller.Engine - Datasource params: (,DataSourceParams(List(eventA, eventB, eventC),Bar,FooBar,Foo,None,None,Some(1),Some(1),None)) 2017-01-17 09:56:24,256 INFO main@prediction.controller.Engine - Extracting preparator params... 2017-01-17 09:56:24,257 INFO main@prediction.controller.Engine - Preparator params: (,Empty) 2017-01-17 09:56:24,262 INFO main@prediction.controller.Engine - Extracting serving params... 2017-01-17 09:56:24,262 INFO main@prediction.controller.Engine - Serving params: (,Empty) 2017-01-17 09:56:26,051 INFO sparkDriverActorSystem-akka.actor.default-dispatcher-3@Remoting - Starting remoting 2017-01-17 09:56:26,173 INFO sparkDriverActorSystem-akka.actor.default-dispatcher-3@Remoting - Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@<IP-Removed>:43037] 2017-01-17 09:56:27,702 ERROR main@apache.spark.SparkContext - Error initializing SparkContext. java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:264) at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:262) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.setEnvFromInputString(YarnSparkHadoopUtil.scala:262) at org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:640) at org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:638) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:638) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:726) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:142) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:144) at org.apache.spark.SparkContext.<init>(SparkContext.scala:530) at io.prediction.workflow.WorkflowContext$.apply(WorkflowContext.scala:42) at io.prediction.workflow.CoreWorkflow$.runTrain(CoreWorkflow.scala:57) at io.prediction.workflow.CreateWorkflow$.main(CreateWorkflow.scala:247) at io.prediction.workflow.CreateWorkflow.main(CreateWorkflow.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 2017-01-17 09:56:27,787 WARN main@spark.metrics.MetricsSystem - Stopping a MetricsSystem that is not running Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:264) at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:262) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.setEnvFromInputString(YarnSparkHadoopUtil.scala:262) at org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:640) at org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:638) at scala.Option.foreach(Option.scala:236) at org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:638) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:726) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:142) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:144) at org.apache.spark.SparkContext.<init>(SparkContext.scala:530) at io.prediction.workflow.WorkflowContext$.apply(WorkflowContext.scala:42) at io.prediction.workflow.CoreWorkflow$.runTrain(CoreWorkflow.scala:57) at io.prediction.workflow.CreateWorkflow$.main(CreateWorkflow.scala:247) at io.prediction.workflow.CreateWorkflow.main(CreateWorkflow.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` ---- ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---