GitHub user bellemare opened an issue:

    https://github.com/apache/incubator-predictionio/issues/337

    Yarn support not working due to "," delimiter instead of "=" delimiter in 
env("SPARK_YARN_USER_ENV")

    The PIO (trunk) Engine run() function populates the SPARK_YARN_USER_ENV 
using the following code. Note that the key-values are joined by a comma:
    
https://github.com/apache/incubator-predictionio/blob/e4a3c0c9fc1251d7355d921acb66168226446b3f/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala#L416
    ```
    "SPARK_YARN_USER_ENV" -> sys.env.filter(kv => kv._1.startsWith("PIO_")).
    map(kv => s"${kv._1}=${kv._2}").mkString(",")).run())
    ```
    
    The spark YarnSparkHadoopUtil within the Spark Project (trunk) is used to 
read SPARK_YARN_USER_ENV if it is set. Note that it strictly expects a "=" 
between the Key-Value pairs.
    
https://github.com/apache/spark/blob/81e5619ca141a1d3a06547d2b682cbe3f135b360/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala#L148
    
    ```
    def setEnvFromInputString(env: HashMap[String, String], inputString: 
String): Unit = {
        if (inputString != null && inputString.length() > 0) {
          val childEnvs = inputString.split(",")
          val p = Pattern.compile(environmentVariableRegex)
          for (cEnv <- childEnvs) {
            val parts = cEnv.split("=") // split on '='
            val m = p.matcher(parts(1))
            val sb = new StringBuffer
            while (m.find()) {
              val variable = m.group(1)
              var replace = ""
              if (env.get(variable) != None) {
                replace = env.get(variable).get
              } else {
                // if this key is not configured for the child .. get it from 
the env
                replace = System.getenv(variable)
                if (replace == null) {
                // the env key is note present anywhere .. simply set it
                  replace = ""
                }
              }
              m.appendReplacement(sb, Matcher.quoteReplacement(replace))
            }
            m.appendTail(sb)
            // This treats the environment variable as path variable delimited 
by `File.pathSeparator`
            // This is kept for backward compatibility and consistency with 
Hadoop's behavior
            addPathToEnvironment(env, parts(0), sb.toString)
          }
        }
    }
    ```
    
    
    The end-result is that when creating a Yarn client, the SPARK_YARN_USER_ENV 
is not set to the convention expected under the Spark standard. This following 
portion produces an array with length 1 (since there is no = to split on):
    ```
    val parts = cEnv.split("=") // split on '='
    ```
    And you end up getting this exception when trying to launch a Spark Yarn 
client:
    
    ```
    2017-01-17 09:56:20,362 INFO  main@tools.console.Console$ - Using existing 
engine manifest JSON at /home/path/to/work/dir/manifest.json
    2017-01-17 09:56:20,364 WARN  main@tools.console.Template$ - template.json 
does not exist. Template metadata will not be available. (This is safe to 
ignore if you are not working on a template.)
    2017-01-17 09:56:21,648 INFO  main@prediction.tools.Runner$ - Submission 
command: /opt/spark-current/bin/spark-submit --master yarn --deploy-mode client 
--driver-memory 5G --conf spark.driver.maxResultSize=2G --conf 
spark.memory.fraction=0.75 --executor-memory 25G --conf spark.executor.cores=16 
--total-executor-cores 45 --conf spark.task.maxFailures=1 --conf 
spark.ui.retainedJobs=111 --conf spark.ui.retainedStages=1111 --conf 
spark.eventLog.enabled=true --conf spark.logConf=true --conf 
spark.executor.extraJavaOptions=-Djava.util.Arrays.useLegacyMergeSort=true 
--class io.prediction.workflow.CreateWorkflow --jars 
file:/home/path/to/work/dir/target/scala-2.10/flyer-affinity-assembly-1.20160823.0-deps.jar,file:/home/path/to/work/dir/target/scala-2.10/flyer-affinity_2.10-1.20160823.0.jar
 --files 
file:/opt/PredictionIO/conf/log4j.properties,file:/opt/hadoop-current/etc/hadoop/core-site.xml,file:/opt/hbase-current/conf/hbase-site.xml
 --driver-class-path /opt/PredictionIO/conf:/opt/hadoo
 p-current/etc/hadoop:/opt/hbase-current/conf 
file:/opt/PredictionIO/lib/pio-assembly-0.9.5.jar --engine-id 
qJOA3WjMK8L8II8NqEMcRnAf7id45cDd --engine-version 
886f3f5ac890f5d82e6be2d0ca82fe1f4ed7a84a --engine-variant 
file:/home/path/to/work/dir/engine.json --verbosity 0 --json-extractor Both 
--env 
PIO_STORAGE_SOURCES_HBASE_TYPE=hbase,PIO_ENV_LOADED=1,PIO_STORAGE_REPOSITORIES_METADATA_NAME=pio_meta,PIO_FS_BASEDIR=/pio-apps,PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=xx.xx.xx.245,xx.xx.xx.156,xx.xx.xx.236,PIO_HOME=/opt/PredictionIO,PIO_FS_ENGINESDIR=/pio-apps/engines,PIO_STORAGE_SOURCES_HDFS_TYPE=hdfs,PIO_STORAGE_SOURCES_HDFS_PATH=/pio-apps/models,PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch,PIO_STORAGE_REPOSITORIES_METADATA_SOURCE=ELASTICSEARCH,PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE=HDFS,PIO_STORAGE_REPOSITORIES_EVENTDATA_NAME=pio_event,PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=pio-staging-elasticsearch,PIO_FS_TMPDIR=/pio-apps/tmp,PIO_STORAGE_REPOSITORIES_MODELDATA_NAME=pio_
 
model,PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE=HBASE,PIO_CONF_DIR=/opt/PredictionIO/conf,PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9300
    2017-01-17 09:56:22,884 WARN  m...@hadoop.util.NativeCodeLoader - Unable to 
load native-hadoop library for your platform... using builtin-java classes 
where applicable
    2017-01-17 09:56:23,786 INFO  main@prediction.controller.Engine - 
Extracting datasource params...
    2017-01-17 09:56:23,856 INFO  main@prediction.workflow.WorkflowUtils$ - No 
'name' is found. Default empty String will be used.
    2017-01-17 09:56:24,255 INFO  main@prediction.controller.Engine - 
Datasource params: (,DataSourceParams(List(eventA, eventB, 
eventC),Bar,FooBar,Foo,None,None,Some(1),Some(1),None))
    2017-01-17 09:56:24,256 INFO  main@prediction.controller.Engine - 
Extracting preparator params...
    2017-01-17 09:56:24,257 INFO  main@prediction.controller.Engine - 
Preparator params: (,Empty)
    2017-01-17 09:56:24,262 INFO  main@prediction.controller.Engine - 
Extracting serving params...
    2017-01-17 09:56:24,262 INFO  main@prediction.controller.Engine - Serving 
params: (,Empty)
    2017-01-17 09:56:26,051 INFO  
sparkDriverActorSystem-akka.actor.default-dispatcher-3@Remoting - Starting 
remoting
    2017-01-17 09:56:26,173 INFO  
sparkDriverActorSystem-akka.actor.default-dispatcher-3@Remoting - Remoting 
started; listening on addresses 
:[akka.tcp://sparkDriverActorSystem@<IP-Removed>:43037]
    2017-01-17 09:56:27,702 ERROR main@apache.spark.SparkContext - Error 
initializing SparkContext.
    java.lang.ArrayIndexOutOfBoundsException: 1
            at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:264)
            at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:262)
            at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
            at 
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
            at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.setEnvFromInputString(YarnSparkHadoopUtil.scala:262)
            at 
org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:640)
            at 
org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:638)
            at scala.Option.foreach(Option.scala:236)
            at 
org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:638)
            at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:726)
            at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:142)
            at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
            at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:144)
            at org.apache.spark.SparkContext.<init>(SparkContext.scala:530)
            at 
io.prediction.workflow.WorkflowContext$.apply(WorkflowContext.scala:42)
            at 
io.prediction.workflow.CoreWorkflow$.runTrain(CoreWorkflow.scala:57)
            at 
io.prediction.workflow.CreateWorkflow$.main(CreateWorkflow.scala:247)
            at io.prediction.workflow.CreateWorkflow.main(CreateWorkflow.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
            at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:606)
            at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
            at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
            at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    2017-01-17 09:56:27,787 WARN  main@spark.metrics.MetricsSystem - Stopping a 
MetricsSystem that is not running
    Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1
            at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:264)
            at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$setEnvFromInputString$1.apply(YarnSparkHadoopUtil.scala:262)
            at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
            at 
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
            at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$.setEnvFromInputString(YarnSparkHadoopUtil.scala:262)
            at 
org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:640)
            at 
org.apache.spark.deploy.yarn.Client$$anonfun$setupLaunchEnv$6.apply(Client.scala:638)
            at scala.Option.foreach(Option.scala:236)
            at 
org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:638)
            at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:726)
            at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:142)
            at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
            at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:144)
            at org.apache.spark.SparkContext.<init>(SparkContext.scala:530)
            at 
io.prediction.workflow.WorkflowContext$.apply(WorkflowContext.scala:42)
            at 
io.prediction.workflow.CoreWorkflow$.runTrain(CoreWorkflow.scala:57)
            at 
io.prediction.workflow.CreateWorkflow$.main(CreateWorkflow.scala:247)
            at io.prediction.workflow.CreateWorkflow.main(CreateWorkflow.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
            at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:606)
            at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
            at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
            at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    ```

----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to