I'm still getting an error.  Here's my code, which works successfully when
tested using spark-shell:

      val badIPs = sc.textFile("/user/sb/badfullIPs.csv").collect
      val badIpSet = badIPs.toSet
      val badIPsBC = sc.broadcast(badIpSet)

The job looks OK from my end:

15/02/07 18:59:58 INFO Client: Application report from ASM:

         application identifier: application_1423081782629_3861

         appId: 3861

*         clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service:  }*


         appMasterHost: phd40010008.na.com

         appQueue: root.default

         appMasterRpcPort: 0

         appStartTime: 1423353581140

*         yarnAppState: RUNNING*

         distributedFinalState: UNDEFINED

But the streaming process never actually begins.  The full log is below,
scroll to the end for the repeated warning "WARN YarnClusterScheduler:
Initial job has not accepted any resources; check your cluster UI to ensure
that workers are registered and have sufficient memory".

I'll note that I have a different Spark Streaming app called "dqd" working
successfully for a different job that uses only a StreamingContext and not
an additional SparkContext.  But this app (called "sbStreamingTv") uses
both a SparkContext and a StreamingContext for grabbing a lookup file in
HDFS for IP filtering. * The references to line #198 from the log below
refers to the "val badIPs = sc.textFile("/user/sb/badfullIPs.csv").collect"
line shown above, and it looks like Spark doesn't get beyond that point in
the code.*

Also, this job ("sbStreamingTv") does work successfully using yarn-client,
even with both a SparkContext and StreamingContext.  It looks to me that in
yarn-cluster mode it's grabbing resources for the StreamingContext but not
for the SparkContext.

Any ideas?


15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity
1177.8 MB.
15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with
id = ConnectionManagerId(phd40010008.na.com,30129)
15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager
15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager
phd40010008.na.com:30129 with 1177.8 MB RAM
15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager
15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at
15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is
15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
15/02/10 12:06:16 INFO JettyUtils: Adding filter:
15/02/10 12:06:16 INFO SparkUI: Started SparkUI at
15/02/10 12:06:17 INFO EventLoggingListener: Logging events to
15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler
15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for
context org.apache.spark.SparkContext@7f38095d
15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster
15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors.
15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor
containers, each with 2432 memory
15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any,
priority: 1, capability: <memory:2432, vCores:1>
15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any,
priority: 1, capability: <memory:2432, vCores:1>
15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any,
priority: 1, capability: <memory:2432, vCores:1>
15/02/10 12:06:20 INFO YarnClusterScheduler:
YarnClusterScheduler.postStartHook done
15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir
will be overridden by the value set by the cluster manager (via
SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg
15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(jg)
15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started
15/02/10 12:06:20 INFO Remoting: Starting remoting
15/02/10 12:06:20 INFO Remoting: Remoting started; listening on addresses
15/02/10 12:06:20 INFO Remoting: Remoting now listens on addresses:
15/02/10 12:06:20 INFO SparkEnv: Registering MapOutputTracker
15/02/10 12:06:20 INFO SparkEnv: Registering BlockManagerMaster
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
15/02/10 12:06:20 INFO MemoryStore: MemoryStore started with capacity
1177.8 MB.
15/02/10 12:06:20 INFO ConnectionManager: Bound socket to port 55944 with
id = ConnectionManagerId(phd40010008.na.com,55944)
15/02/10 12:06:20 INFO BlockManagerMaster: Trying to register BlockManager
15/02/10 12:06:20 INFO BlockManagerInfo: Registering block manager
phd40010008.na.com:55944 with 1177.8 MB RAM
15/02/10 12:06:20 INFO BlockManagerMaster: Registered BlockManager
15/02/10 12:06:20 INFO HttpFileServer: HTTP File server directory is
15/02/10 12:06:20 INFO HttpServer: Starting HTTP Server
15/02/10 12:06:20 INFO JettyUtils: Adding filter:
15/02/10 12:06:20 INFO SparkUI: Started SparkUI at
15/02/10 12:06:20 INFO EventLoggingListener: Logging events to
15/02/10 12:06:20 INFO YarnClusterScheduler: Created YarnClusterScheduler
15/02/10 12:06:20 INFO YarnClusterScheduler:
YarnClusterScheduler.postStartHook done
15/02/10 12:06:21 INFO MemoryStore: ensureFreeSpace(253715) called with
curMem=0, maxMem=1235012812
15/02/10 12:06:21 INFO MemoryStore: Block broadcast_0 stored as values to
memory (estimated size 247.8 KB, free 1177.6 MB)
15/02/10 12:06:21 INFO FileInputFormat: Total input paths to process : 1
15/02/10 12:06:21 INFO SparkContext: Starting job: collect at
15/02/10 12:06:21 INFO DAGScheduler: Got job 0 (collect at
sbStreamingTv.scala:198) with 2 output partitions (allowLocal=false)
15/02/10 12:06:21 INFO DAGScheduler: Final stage: Stage 0(*collect at
15/02/10 12:06:21 INFO DAGScheduler: Parents of final stage: List()
15/02/10 12:06:21 INFO DAGScheduler: Missing parents: List()
15/02/10 12:06:21 INFO DAGScheduler: Submitting Stage 0 (*MappedRDD[1] at
textFile at sbStreamingTv.scala:198*), which has no missing parents
15/02/10 12:06:21 INFO DAGScheduler: Submitting 2 missing tasks from Stage
0 (*MappedRDD[1] at textFile at sbStreamingTv.scala:198*)
15/02/10 12:06:21 INFO YarnClusterScheduler: Adding task set 0.0 with 2
15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for :
15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for :
15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for :
15/02/10 12:06:21 INFO RackResolver: Resolved phd40010002.na.com to
15/02/10 12:06:21 INFO RackResolver: Resolved phd40010022.na.com to
15/02/10 12:06:21 INFO RackResolver: Resolved phd40010024.na.com to
15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container
container_1423081782629_7370_01_000003 for on host phd40010002.na.com
15/02/10 12:06:21 INFO YarnAllocationHandler: Launching ExecutorRunnable.
driverUrl: akka.tcp://
 executorHostname: phd40010002.na.com
15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container
container_1423081782629_7370_01_000004 for on host phd40010022.na.com
15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container
15/02/10 12:06:21 INFO YarnAllocationHandler: Launching ExecutorRunnable.
driverUrl: akka.tcp://
 executorHostname: phd40010022.na.com
15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container
15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container
container_1423081782629_7370_01_000002 for on host phd40010024.na.com
15/02/10 12:06:21 INFO YarnAllocationHandler: Launching ExecutorRunnable.
driverUrl: akka.tcp://
 executorHostname: phd40010024.na.com
15/02/10 12:06:21 INFO ContainerManagementProtocolProxy:
yarn.client.max-nodemanagers-proxies : 500
15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container
15/02/10 12:06:21 INFO ContainerManagementProtocolProxy:
yarn.client.max-nodemanagers-proxies : 500
15/02/10 12:06:21 INFO ContainerManagementProtocolProxy:
yarn.client.max-nodemanagers-proxies : 500
15/02/10 12:06:21 INFO ExecutorRunnable: Setting up ContainerLaunchContext
15/02/10 12:06:21 INFO ExecutorRunnable: Setting up ContainerLaunchContext
15/02/10 12:06:21 INFO ExecutorRunnable: Setting up ContainerLaunchContext
15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources
15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources
15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources
15/02/10 12:06:21 INFO ApplicationMaster: All executors have launched.
15/02/10 12:06:21 INFO ApplicationMaster: Started progress reporter thread
- sleep time : 5000
15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources
Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port:
-1 file:
} size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE,
__app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1
} size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE)
15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources
Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port:
-1 file:
} size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE,
__app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1
} size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE)
15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources
Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port:
-1 file:
} size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE,
__app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1
} size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE)
15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with commands:
List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill %p',
-Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp,
org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://
sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 1,
phd40010002.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with commands:
List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill %p',
-Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp,
org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://
sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 3,
phd40010024.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with commands:
List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill %p',
-Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp,
org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://
sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 2,
phd40010022.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered executor:
sparkexecu...@phd40010022.na.com:29369/user/Executor#43651774] with ID 2
15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered executor:
sparkexecu...@phd40010024.na.com:12969/user/Executor#1711844295] with ID 3
15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager
phd40010022.na.com:14119 with 1178.1 MB RAM
15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager
phd40010024.na.com:53284 with 1178.1 MB RAM
15/02/10 12:06:29 INFO CoarseGrainedSchedulerBackend: Registered executor:
sparkexecu...@phd40010002.na.com:35547/user/Executor#-1690254909] with ID 1
15/02/10 12:06:29 INFO BlockManagerInfo: Registering block manager
phd40010002.na.com:62754 with 1178.1 MB RAM
15/02/10 12:06:36 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:06:51 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:07:06 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:07:21 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:07:36 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:07:51 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:08:06 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:08:21 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:08:36 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:08:51 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:09:06 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:09:21 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:09:36 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:09:51 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:10:06 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:10:21 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:10:36 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:10:51 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory

On Fri, Feb 6, 2015 at 3:24 PM, Sandy Ryza <sandy.r...@cloudera.com> wrote:

> You can call collect() to pull in the contents of an RDD into the driver:
>   val badIPsLines = badIPs.collect()
> On Fri, Feb 6, 2015 at 12:19 PM, Jon Gregg <jonrgr...@gmail.com> wrote:
>> OK I tried that, but how do I convert an RDD to a Set that I can then
>> broadcast and cache?
>>       val badIPs = sc.textFile("hdfs:///user/jon/"+ "badfullIPs.csv")
>>       val badIPsLines = badIPs.getLines
>>       val badIpSet = badIPsLines.toSet
>>       val badIPsBC = sc.broadcast(badIpSet)
>> produces the error "value getLines is not a member of
>> org.apache.spark.rdd.RDD[String]".
>> Leaving it as an RDD and then constantly joining I think will be too slow
>> for a streaming job.
>> On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza <sandy.r...@cloudera.com>
>> wrote:
>>> Hi Jon,
>>> You'll need to put the file on HDFS (or whatever distributed filesystem
>>> you're running on) and load it from there.
>>> -Sandy
>>> On Thu, Feb 5, 2015 at 3:18 PM, YaoPau <jonrgr...@gmail.com> wrote:
>>>> I have a file "badFullIPs.csv" of bad IP addresses used for filtering.
>>>> In
>>>> yarn-client mode, I simply read it off the edge node, transform it, and
>>>> then
>>>> broadcast it:
>>>>       val badIPs = fromFile(edgeDir + "badfullIPs.csv")
>>>>       val badIPsLines = badIPs.getLines
>>>>       val badIpSet = badIPsLines.toSet
>>>>       val badIPsBC = sc.broadcast(badIpSet)
>>>>       badIPs.close
>>>> How can I accomplish this in yarn-cluster mode?
>>>> Jon
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to