Re: How to broadcast a variable read from a file in yarn-cluster mode?
Can you give me the whole logs? TD On Tue, Feb 10, 2015 at 10:48 AM, Jon Gregg jonrgr...@gmail.com wrote: OK that worked and getting close here ... the job ran successfully for a bit and I got output for the first couple buckets before getting a java.lang.Exception: Could not compute split, block input-0-1423593163000 not found error. So I bumped up the memory at the command line from 2 gb to 5 gb, ran it again ... this time I got around 8 successful outputs before erroring. Bumped up the memory from 5 gb to 10 gb ... got around 15 successful outputs before erroring. I'm not persisting or caching anything except for the broadcast IP table and another broadcast small user agents list used for the same type of filtering, and both files are tiny. The Hadoop cluster is nearly empty right now and has more than enough available memory to handle this job. I am connecting to Kafka as well and so there's a lot of data coming through as my index is trying to catch up to the current date, but yarn-client mode has several times in the past few weeks been able to catch up to the current date and run successfully for days without issue. My guess is memory isn't being cleared after each bucket? Relevant portion of the log below. 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593134400 on phd40010023.na.com:1 in memory (size: 50.1 MB, free: 10.2 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593135400 on phd40010023.na.com:1 in memory (size: 24.9 MB, free: 10.2 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593136400 on phd40010023.na.com:1 in memory (size: 129.0 MB, free: 10.3 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137000 on phd40010023.na.com:1 in memory (size: 112.4 MB, free: 10.4 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137200 on phd40010023.na.com:1 in memory (size: 481.0 B, free: 10.4 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137800 on phd40010023.na.com:1 in memory (size: 44.6 MB, free: 10.5 GB) 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 754 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138400 on phd40010023.na.com:1 in memory (size: 95.8 MB, free: 10.6 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 754 15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 753 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138600 on phd40010023.na.com:1 in memory (size: 123.2 MB, free: 10.7 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138800 on phd40010023.na.com:1 in memory (size: 5.2 KB, free: 10.7 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 753 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 750 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139600 on phd40010023.na.com:1 in memory (size: 106.4 MB, free: 10.8 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139800 on phd40010023.na.com:1 in memory (size: 107.0 MB, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-142359314 on phd40010023.na.com:1 in memory (size: 59.5 MB, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 750 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 749 from persistence list 15/02/10 13:34:54 INFO DAGScheduler: Missing parents: List(Stage 117, Stage 114, Stage 115, Stage 116) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140200 on phd40010023.na.com:1 in memory (size: 845.0 B, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140600 on phd40010023.na.com:1 in memory (size: 19.2 MB, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140800 on phd40010023.na.com:1 in memory (size: 492.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593141000 on phd40010023.na.com:1 in memory (size: 1018.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 749 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142400 on phd40010023.na.com:1 in memory (size: 48.6 MB, free: 11.0 GB) 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 767 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142600 on phd40010023.na.com:1 in memory (size: 4.9 KB, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142800 on phd40010023.na.com:1 in memory (size: 780.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593143800 on phd40010023.na.com:1 in memory (size: 847.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 767 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593144400 on phd40010023.na.com:1 in memory (size: 43.7 MB,
Re: How to broadcast a variable read from a file in yarn-cluster mode?
Is the SparkContext you're using the same one that the StreamingContext wraps? If not, I don't think using two is supported. -Sandy On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote: I'm still getting an error. Here's my code, which works successfully when tested using spark-shell: val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect val badIpSet = badIPs.toSet val badIPsBC = sc.broadcast(badIpSet) The job looks OK from my end: 15/02/07 18:59:58 INFO Client: Application report from ASM: application identifier: application_1423081782629_3861 appId: 3861 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service: }* appDiagnostics: appMasterHost: phd40010008.na.com appQueue: root.default appMasterRpcPort: 0 appStartTime: 1423353581140 * yarnAppState: RUNNING* distributedFinalState: UNDEFINED But the streaming process never actually begins. The full log is below, scroll to the end for the repeated warning WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory. I'll note that I have a different Spark Streaming app called dqd working successfully for a different job that uses only a StreamingContext and not an additional SparkContext. But this app (called sbStreamingTv) uses both a SparkContext and a StreamingContext for grabbing a lookup file in HDFS for IP filtering. * The references to line #198 from the log below refers to the val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it looks like Spark doesn't get beyond that point in the code.* Also, this job (sbStreamingTv) does work successfully using yarn-client, even with both a SparkContext and StreamingContext. It looks to me that in yarn-cluster mode it's grabbing resources for the StreamingContext but not for the SparkContext. Any ideas? Jon 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity 1177.8 MB. 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with id = ConnectionManagerId(phd40010008.na.com,30129) 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager phd40010008.na.com:30129 with 1177.8 MB RAM 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at http://10.229.16.108:35183 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at http://phd40010008.na.com:25869 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@7f38095d 15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors. 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor containers, each with 2432 memory 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:20 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jg) 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started 15/02/10 12:06:20 INFO Remoting: Starting remoting 15/02/10 12:06:20 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@phd40010008.na.com:43340] 15/02/10 12:06:20 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@phd40010008.na.com:43340] 15/02/10 12:06:20 INFO SparkEnv:
Re: How to broadcast a variable read from a file in yarn-cluster mode?
I'm still getting an error. Here's my code, which works successfully when tested using spark-shell: val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect val badIpSet = badIPs.toSet val badIPsBC = sc.broadcast(badIpSet) The job looks OK from my end: 15/02/07 18:59:58 INFO Client: Application report from ASM: application identifier: application_1423081782629_3861 appId: 3861 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service: }* appDiagnostics: appMasterHost: phd40010008.na.com appQueue: root.default appMasterRpcPort: 0 appStartTime: 1423353581140 * yarnAppState: RUNNING* distributedFinalState: UNDEFINED But the streaming process never actually begins. The full log is below, scroll to the end for the repeated warning WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory. I'll note that I have a different Spark Streaming app called dqd working successfully for a different job that uses only a StreamingContext and not an additional SparkContext. But this app (called sbStreamingTv) uses both a SparkContext and a StreamingContext for grabbing a lookup file in HDFS for IP filtering. * The references to line #198 from the log below refers to the val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it looks like Spark doesn't get beyond that point in the code.* Also, this job (sbStreamingTv) does work successfully using yarn-client, even with both a SparkContext and StreamingContext. It looks to me that in yarn-cluster mode it's grabbing resources for the StreamingContext but not for the SparkContext. Any ideas? Jon 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity 1177.8 MB. 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with id = ConnectionManagerId(phd40010008.na.com,30129) 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager phd40010008.na.com:30129 with 1177.8 MB RAM 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at http://10.229.16.108:35183 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at http://phd40010008.na.com:25869 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@7f38095d 15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors. 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor containers, each with 2432 memory 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:20 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jg) 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started 15/02/10 12:06:20 INFO Remoting: Starting remoting 15/02/10 12:06:20 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@phd40010008.na.com:43340] 15/02/10 12:06:20 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@phd40010008.na.com:43340] 15/02/10 12:06:20 INFO SparkEnv: Registering MapOutputTracker 15/02/10 12:06:20 INFO SparkEnv: Registering BlockManagerMaster 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at /hdata/1/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-f6e1 15/02/10 12:06:20 INFO
Re: How to broadcast a variable read from a file in yarn-cluster mode?
They're separate in my code, how can I combine them? Here's what I have: val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf, Seconds(bucketSecs)) val sc = new SparkContext() On Tue, Feb 10, 2015 at 1:02 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Is the SparkContext you're using the same one that the StreamingContext wraps? If not, I don't think using two is supported. -Sandy On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote: I'm still getting an error. Here's my code, which works successfully when tested using spark-shell: val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect val badIpSet = badIPs.toSet val badIPsBC = sc.broadcast(badIpSet) The job looks OK from my end: 15/02/07 18:59:58 INFO Client: Application report from ASM: application identifier: application_1423081782629_3861 appId: 3861 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service: }* appDiagnostics: appMasterHost: phd40010008.na.com appQueue: root.default appMasterRpcPort: 0 appStartTime: 1423353581140 * yarnAppState: RUNNING* distributedFinalState: UNDEFINED But the streaming process never actually begins. The full log is below, scroll to the end for the repeated warning WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory. I'll note that I have a different Spark Streaming app called dqd working successfully for a different job that uses only a StreamingContext and not an additional SparkContext. But this app (called sbStreamingTv) uses both a SparkContext and a StreamingContext for grabbing a lookup file in HDFS for IP filtering. * The references to line #198 from the log below refers to the val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it looks like Spark doesn't get beyond that point in the code.* Also, this job (sbStreamingTv) does work successfully using yarn-client, even with both a SparkContext and StreamingContext. It looks to me that in yarn-cluster mode it's grabbing resources for the StreamingContext but not for the SparkContext. Any ideas? Jon 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity 1177.8 MB. 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with id = ConnectionManagerId(phd40010008.na.com,30129) 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager phd40010008.na.com:30129 with 1177.8 MB RAM 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at http://10.229.16.108:35183 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at http://phd40010008.na.com:25869 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@7f38095d 15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors. 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor containers, each with 2432 memory 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:20 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jg) 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started 15/02/10 12:06:20 INFO
Re: How to broadcast a variable read from a file in yarn-cluster mode?
You should be able to replace that second line with val sc = ssc.sparkContext On Tue, Feb 10, 2015 at 10:04 AM, Jon Gregg jonrgr...@gmail.com wrote: They're separate in my code, how can I combine them? Here's what I have: val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf, Seconds(bucketSecs)) val sc = new SparkContext() On Tue, Feb 10, 2015 at 1:02 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Is the SparkContext you're using the same one that the StreamingContext wraps? If not, I don't think using two is supported. -Sandy On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote: I'm still getting an error. Here's my code, which works successfully when tested using spark-shell: val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect val badIpSet = badIPs.toSet val badIPsBC = sc.broadcast(badIpSet) The job looks OK from my end: 15/02/07 18:59:58 INFO Client: Application report from ASM: application identifier: application_1423081782629_3861 appId: 3861 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service: }* appDiagnostics: appMasterHost: phd40010008.na.com appQueue: root.default appMasterRpcPort: 0 appStartTime: 1423353581140 * yarnAppState: RUNNING* distributedFinalState: UNDEFINED But the streaming process never actually begins. The full log is below, scroll to the end for the repeated warning WARN YarnClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory. I'll note that I have a different Spark Streaming app called dqd working successfully for a different job that uses only a StreamingContext and not an additional SparkContext. But this app (called sbStreamingTv) uses both a SparkContext and a StreamingContext for grabbing a lookup file in HDFS for IP filtering. * The references to line #198 from the log below refers to the val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it looks like Spark doesn't get beyond that point in the code.* Also, this job (sbStreamingTv) does work successfully using yarn-client, even with both a SparkContext and StreamingContext. It looks to me that in yarn-cluster mode it's grabbing resources for the StreamingContext but not for the SparkContext. Any ideas? Jon 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity 1177.8 MB. 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with id = ConnectionManagerId(phd40010008.na.com,30129) 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager phd40010008.na.com:30129 with 1177.8 MB RAM 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at http://10.229.16.108:35183 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server 15/02/10 12:06:16 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at http://phd40010008.na.com:25869 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@7f38095d 15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors. 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor containers, each with 2432 memory 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any, priority: 1, capability: memory:2432, vCores:1 15/02/10 12:06:20 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg 15/02/10 12:06:20 INFO SecurityManager: SecurityManager:
Re: How to broadcast a variable read from a file in yarn-cluster mode?
OK that worked and getting close here ... the job ran successfully for a bit and I got output for the first couple buckets before getting a java.lang.Exception: Could not compute split, block input-0-1423593163000 not found error. So I bumped up the memory at the command line from 2 gb to 5 gb, ran it again ... this time I got around 8 successful outputs before erroring. Bumped up the memory from 5 gb to 10 gb ... got around 15 successful outputs before erroring. I'm not persisting or caching anything except for the broadcast IP table and another broadcast small user agents list used for the same type of filtering, and both files are tiny. The Hadoop cluster is nearly empty right now and has more than enough available memory to handle this job. I am connecting to Kafka as well and so there's a lot of data coming through as my index is trying to catch up to the current date, but yarn-client mode has several times in the past few weeks been able to catch up to the current date and run successfully for days without issue. My guess is memory isn't being cleared after each bucket? Relevant portion of the log below. 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593134400 on phd40010023.na.com:1 in memory (size: 50.1 MB, free: 10.2 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593135400 on phd40010023.na.com:1 in memory (size: 24.9 MB, free: 10.2 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593136400 on phd40010023.na.com:1 in memory (size: 129.0 MB, free: 10.3 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137000 on phd40010023.na.com:1 in memory (size: 112.4 MB, free: 10.4 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137200 on phd40010023.na.com:1 in memory (size: 481.0 B, free: 10.4 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137800 on phd40010023.na.com:1 in memory (size: 44.6 MB, free: 10.5 GB) 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 754 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138400 on phd40010023.na.com:1 in memory (size: 95.8 MB, free: 10.6 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 754 15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 753 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138600 on phd40010023.na.com:1 in memory (size: 123.2 MB, free: 10.7 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138800 on phd40010023.na.com:1 in memory (size: 5.2 KB, free: 10.7 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 753 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 750 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139600 on phd40010023.na.com:1 in memory (size: 106.4 MB, free: 10.8 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139800 on phd40010023.na.com:1 in memory (size: 107.0 MB, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-142359314 on phd40010023.na.com:1 in memory (size: 59.5 MB, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 750 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 749 from persistence list 15/02/10 13:34:54 INFO DAGScheduler: Missing parents: List(Stage 117, Stage 114, Stage 115, Stage 116) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140200 on phd40010023.na.com:1 in memory (size: 845.0 B, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140600 on phd40010023.na.com:1 in memory (size: 19.2 MB, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140800 on phd40010023.na.com:1 in memory (size: 492.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593141000 on phd40010023.na.com:1 in memory (size: 1018.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 749 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142400 on phd40010023.na.com:1 in memory (size: 48.6 MB, free: 11.0 GB) 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 767 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142600 on phd40010023.na.com:1 in memory (size: 4.9 KB, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142800 on phd40010023.na.com:1 in memory (size: 780.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593143800 on phd40010023.na.com:1 in memory (size: 847.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 767 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593144400 on phd40010023.na.com:1 in memory (size: 43.7 MB, free: 11.1 GB) 15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 766 from persistence list 15/02/10 13:34:54 INFO BlockManager: Removing RDD 766 15/02/10 13:34:54 INFO MappedRDD:
Re: How to broadcast a variable read from a file in yarn-cluster mode?
OK I tried that, but how do I convert an RDD to a Set that I can then broadcast and cache? val badIPs = sc.textFile(hdfs:///user/jon/+ badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) produces the error value getLines is not a member of org.apache.spark.rdd.RDD[String]. Leaving it as an RDD and then constantly joining I think will be too slow for a streaming job. On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Jon, You'll need to put the file on HDFS (or whatever distributed filesystem you're running on) and load it from there. -Sandy On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote: I have a file badFullIPs.csv of bad IP addresses used for filtering. In yarn-client mode, I simply read it off the edge node, transform it, and then broadcast it: val badIPs = fromFile(edgeDir + badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) badIPs.close How can I accomplish this in yarn-cluster mode? Jon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to broadcast a variable read from a file in yarn-cluster mode?
You can call collect() to pull in the contents of an RDD into the driver: val badIPsLines = badIPs.collect() On Fri, Feb 6, 2015 at 12:19 PM, Jon Gregg jonrgr...@gmail.com wrote: OK I tried that, but how do I convert an RDD to a Set that I can then broadcast and cache? val badIPs = sc.textFile(hdfs:///user/jon/+ badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) produces the error value getLines is not a member of org.apache.spark.rdd.RDD[String]. Leaving it as an RDD and then constantly joining I think will be too slow for a streaming job. On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Jon, You'll need to put the file on HDFS (or whatever distributed filesystem you're running on) and load it from there. -Sandy On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote: I have a file badFullIPs.csv of bad IP addresses used for filtering. In yarn-client mode, I simply read it off the edge node, transform it, and then broadcast it: val badIPs = fromFile(edgeDir + badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) badIPs.close How can I accomplish this in yarn-cluster mode? Jon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to broadcast a variable read from a file in yarn-cluster mode?
I have a file badFullIPs.csv of bad IP addresses used for filtering. In yarn-client mode, I simply read it off the edge node, transform it, and then broadcast it: val badIPs = fromFile(edgeDir + badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) badIPs.close How can I accomplish this in yarn-cluster mode? Jon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to broadcast a variable read from a file in yarn-cluster mode?
Hi Jon, You'll need to put the file on HDFS (or whatever distributed filesystem you're running on) and load it from there. -Sandy On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote: I have a file badFullIPs.csv of bad IP addresses used for filtering. In yarn-client mode, I simply read it off the edge node, transform it, and then broadcast it: val badIPs = fromFile(edgeDir + badfullIPs.csv) val badIPsLines = badIPs.getLines val badIpSet = badIPsLines.toSet val badIPsBC = sc.broadcast(badIpSet) badIPs.close How can I accomplish this in yarn-cluster mode? Jon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org