Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-12 Thread Tathagata Das
Can you give me the whole logs?

TD

On Tue, Feb 10, 2015 at 10:48 AM, Jon Gregg jonrgr...@gmail.com wrote:

 OK that worked and getting close here ... the job ran successfully for a
 bit and I got output for the first couple buckets before getting a
 java.lang.Exception: Could not compute split, block input-0-1423593163000
 not found error.

 So I bumped up the memory at the command line from 2 gb to 5 gb, ran it
 again ... this time I got around 8 successful outputs before erroring.

 Bumped up the memory from 5 gb to 10 gb ... got around 15 successful
 outputs before erroring.


 I'm not persisting or caching anything except for the broadcast IP table
 and another broadcast small user agents list used for the same type of
 filtering, and both files are tiny.  The Hadoop cluster is nearly empty
 right now and has more than enough available memory to handle this job.  I
 am connecting to Kafka as well and so there's a lot of data coming through
 as my index is trying to catch up to the current date, but yarn-client mode
 has several times in the past few weeks been able to catch up to the
 current date and run successfully for days without issue.

 My guess is memory isn't being cleared after each bucket?  Relevant
 portion of the log below.


 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593134400 on
 phd40010023.na.com:1 in memory (size: 50.1 MB, free: 10.2 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593135400 on
 phd40010023.na.com:1 in memory (size: 24.9 MB, free: 10.2 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593136400 on
 phd40010023.na.com:1 in memory (size: 129.0 MB, free: 10.3 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137000 on
 phd40010023.na.com:1 in memory (size: 112.4 MB, free: 10.4 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137200 on
 phd40010023.na.com:1 in memory (size: 481.0 B, free: 10.4 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137800 on
 phd40010023.na.com:1 in memory (size: 44.6 MB, free: 10.5 GB)
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 754 from persistence list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138400 on
 phd40010023.na.com:1 in memory (size: 95.8 MB, free: 10.6 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 754
 15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 753 from persistence
 list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138600 on
 phd40010023.na.com:1 in memory (size: 123.2 MB, free: 10.7 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138800 on
 phd40010023.na.com:1 in memory (size: 5.2 KB, free: 10.7 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 753
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 750 from persistence list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139600 on
 phd40010023.na.com:1 in memory (size: 106.4 MB, free: 10.8 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139800 on
 phd40010023.na.com:1 in memory (size: 107.0 MB, free: 10.9 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-142359314 on
 phd40010023.na.com:1 in memory (size: 59.5 MB, free: 10.9 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 750
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 749 from persistence list
 15/02/10 13:34:54 INFO DAGScheduler: Missing parents: List(Stage 117,
 Stage 114, Stage 115, Stage 116)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140200 on
 phd40010023.na.com:1 in memory (size: 845.0 B, free: 10.9 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140600 on
 phd40010023.na.com:1 in memory (size: 19.2 MB, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140800 on
 phd40010023.na.com:1 in memory (size: 492.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593141000 on
 phd40010023.na.com:1 in memory (size: 1018.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 749
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142400 on
 phd40010023.na.com:1 in memory (size: 48.6 MB, free: 11.0 GB)
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 767 from persistence list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142600 on
 phd40010023.na.com:1 in memory (size: 4.9 KB, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142800 on
 phd40010023.na.com:1 in memory (size: 780.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593143800 on
 phd40010023.na.com:1 in memory (size: 847.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 767
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593144400 on
 phd40010023.na.com:1 in memory (size: 43.7 MB, 

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-10 Thread Sandy Ryza
Is the SparkContext you're using the same one that the StreamingContext
wraps?  If not, I don't think using two is supported.

-Sandy

On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote:

 I'm still getting an error.  Here's my code, which works successfully when
 tested using spark-shell:

   val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect
   val badIpSet = badIPs.toSet
   val badIPsBC = sc.broadcast(badIpSet)


 The job looks OK from my end:

 15/02/07 18:59:58 INFO Client: Application report from ASM:

  application identifier: application_1423081782629_3861

  appId: 3861

 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service:  }*

  appDiagnostics:

  appMasterHost: phd40010008.na.com

  appQueue: root.default

  appMasterRpcPort: 0

  appStartTime: 1423353581140

 * yarnAppState: RUNNING*

  distributedFinalState: UNDEFINED


 But the streaming process never actually begins.  The full log is below,
 scroll to the end for the repeated warning WARN YarnClusterScheduler:
 Initial job has not accepted any resources; check your cluster UI to ensure
 that workers are registered and have sufficient memory.

 I'll note that I have a different Spark Streaming app called dqd working
 successfully for a different job that uses only a StreamingContext and not
 an additional SparkContext.  But this app (called sbStreamingTv) uses
 both a SparkContext and a StreamingContext for grabbing a lookup file in
 HDFS for IP filtering. * The references to line #198 from the log below
 refers to the val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect
 line shown above, and it looks like Spark doesn't get beyond that point in
 the code.*

 Also, this job (sbStreamingTv) does work successfully using yarn-client,
 even with both a SparkContext and StreamingContext.  It looks to me that in
 yarn-cluster mode it's grabbing resources for the StreamingContext but not
 for the SparkContext.

 Any ideas?

 Jon


 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity
 1177.8 MB.
 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with
 id = ConnectionManagerId(phd40010008.na.com,30129)
 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager
 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager
 phd40010008.na.com:30129 with 1177.8 MB RAM
 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at
 http://10.229.16.108:35183
 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is
 /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO JettyUtils: Adding filter:
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at
 http://phd40010008.na.com:25869
 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to
 /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801
 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler
 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for
 context org.apache.spark.SparkContext@7f38095d
 15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster
 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors.
 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor
 containers, each with 2432 memory
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:20 INFO YarnClusterScheduler:
 YarnClusterScheduler.postStartHook done
 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir
 will be overridden by the value set by the cluster manager (via
 SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg
 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(jg)
 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started
 15/02/10 12:06:20 INFO Remoting: Starting remoting
 15/02/10 12:06:20 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sp...@phd40010008.na.com:43340]
 15/02/10 12:06:20 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sp...@phd40010008.na.com:43340]
 15/02/10 12:06:20 INFO SparkEnv: 

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-10 Thread Jon Gregg
I'm still getting an error.  Here's my code, which works successfully when
tested using spark-shell:

  val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect
  val badIpSet = badIPs.toSet
  val badIPsBC = sc.broadcast(badIpSet)


The job looks OK from my end:

15/02/07 18:59:58 INFO Client: Application report from ASM:

 application identifier: application_1423081782629_3861

 appId: 3861

* clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service:  }*

 appDiagnostics:

 appMasterHost: phd40010008.na.com

 appQueue: root.default

 appMasterRpcPort: 0

 appStartTime: 1423353581140

* yarnAppState: RUNNING*

 distributedFinalState: UNDEFINED


But the streaming process never actually begins.  The full log is below,
scroll to the end for the repeated warning WARN YarnClusterScheduler:
Initial job has not accepted any resources; check your cluster UI to ensure
that workers are registered and have sufficient memory.

I'll note that I have a different Spark Streaming app called dqd working
successfully for a different job that uses only a StreamingContext and not
an additional SparkContext.  But this app (called sbStreamingTv) uses
both a SparkContext and a StreamingContext for grabbing a lookup file in
HDFS for IP filtering. * The references to line #198 from the log below
refers to the val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect
line shown above, and it looks like Spark doesn't get beyond that point in
the code.*

Also, this job (sbStreamingTv) does work successfully using yarn-client,
even with both a SparkContext and StreamingContext.  It looks to me that in
yarn-cluster mode it's grabbing resources for the StreamingContext but not
for the SparkContext.

Any ideas?

Jon


15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity
1177.8 MB.
15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with
id = ConnectionManagerId(phd40010008.na.com,30129)
15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager
15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager
phd40010008.na.com:30129 with 1177.8 MB RAM
15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager
15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at
http://10.229.16.108:35183
15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is
/hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec
15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
15/02/10 12:06:16 INFO JettyUtils: Adding filter:
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
15/02/10 12:06:16 INFO SparkUI: Started SparkUI at
http://phd40010008.na.com:25869
15/02/10 12:06:17 INFO EventLoggingListener: Logging events to
/user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801
15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler
15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for
context org.apache.spark.SparkContext@7f38095d
15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster
15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors.
15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor
containers, each with 2432 memory
15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any,
priority: 1, capability: memory:2432, vCores:1
15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any,
priority: 1, capability: memory:2432, vCores:1
15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: Any,
priority: 1, capability: memory:2432, vCores:1
15/02/10 12:06:20 INFO YarnClusterScheduler:
YarnClusterScheduler.postStartHook done
15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir
will be overridden by the value set by the cluster manager (via
SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg
15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(jg)
15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started
15/02/10 12:06:20 INFO Remoting: Starting remoting
15/02/10 12:06:20 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sp...@phd40010008.na.com:43340]
15/02/10 12:06:20 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sp...@phd40010008.na.com:43340]
15/02/10 12:06:20 INFO SparkEnv: Registering MapOutputTracker
15/02/10 12:06:20 INFO SparkEnv: Registering BlockManagerMaster
15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
/hdata/1/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-f6e1
15/02/10 12:06:20 INFO 

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-10 Thread Jon Gregg
They're separate in my code, how can I combine them?  Here's what I have:

  val sparkConf = new SparkConf()
  val ssc =  new StreamingContext(sparkConf, Seconds(bucketSecs))

  val sc = new SparkContext()

On Tue, Feb 10, 2015 at 1:02 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Is the SparkContext you're using the same one that the StreamingContext
 wraps?  If not, I don't think using two is supported.

 -Sandy

 On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote:

 I'm still getting an error.  Here's my code, which works successfully
 when tested using spark-shell:

   val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect
   val badIpSet = badIPs.toSet
   val badIPsBC = sc.broadcast(badIpSet)


 The job looks OK from my end:

 15/02/07 18:59:58 INFO Client: Application report from ASM:

  application identifier: application_1423081782629_3861

  appId: 3861

 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service:  }*

  appDiagnostics:

  appMasterHost: phd40010008.na.com

  appQueue: root.default

  appMasterRpcPort: 0

  appStartTime: 1423353581140

 * yarnAppState: RUNNING*

  distributedFinalState: UNDEFINED


 But the streaming process never actually begins.  The full log is below,
 scroll to the end for the repeated warning WARN YarnClusterScheduler:
 Initial job has not accepted any resources; check your cluster UI to ensure
 that workers are registered and have sufficient memory.

 I'll note that I have a different Spark Streaming app called dqd
 working successfully for a different job that uses only a StreamingContext
 and not an additional SparkContext.  But this app (called sbStreamingTv)
 uses both a SparkContext and a StreamingContext for grabbing a lookup file
 in HDFS for IP filtering. * The references to line #198 from the log
 below refers to the val badIPs =
 sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it
 looks like Spark doesn't get beyond that point in the code.*

 Also, this job (sbStreamingTv) does work successfully using
 yarn-client, even with both a SparkContext and StreamingContext.  It looks
 to me that in yarn-cluster mode it's grabbing resources for the
 StreamingContext but not for the SparkContext.

 Any ideas?

 Jon


 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity
 1177.8 MB.
 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with
 id = ConnectionManagerId(phd40010008.na.com,30129)
 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager
 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager
 phd40010008.na.com:30129 with 1177.8 MB RAM
 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at
 http://10.229.16.108:35183
 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is
 /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO JettyUtils: Adding filter:
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at
 http://phd40010008.na.com:25869
 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to
 /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801
 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler
 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook
 for context org.apache.spark.SparkContext@7f38095d
 15/02/10 12:06:17 INFO ApplicationMaster: Registering the
 ApplicationMaster
 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors.
 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor
 containers, each with 2432 memory
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:20 INFO YarnClusterScheduler:
 YarnClusterScheduler.postStartHook done
 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir
 will be overridden by the value set by the cluster manager (via
 SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg
 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(jg)
 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started
 15/02/10 12:06:20 INFO 

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-10 Thread Sandy Ryza
You should be able to replace that second line with

val sc = ssc.sparkContext

On Tue, Feb 10, 2015 at 10:04 AM, Jon Gregg jonrgr...@gmail.com wrote:

 They're separate in my code, how can I combine them?  Here's what I have:

   val sparkConf = new SparkConf()
   val ssc =  new StreamingContext(sparkConf, Seconds(bucketSecs))

   val sc = new SparkContext()

 On Tue, Feb 10, 2015 at 1:02 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Is the SparkContext you're using the same one that the StreamingContext
 wraps?  If not, I don't think using two is supported.

 -Sandy

 On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote:

 I'm still getting an error.  Here's my code, which works successfully
 when tested using spark-shell:

   val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect
   val badIpSet = badIPs.toSet
   val badIPsBC = sc.broadcast(badIpSet)


 The job looks OK from my end:

 15/02/07 18:59:58 INFO Client: Application report from ASM:

  application identifier: application_1423081782629_3861

  appId: 3861

 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service:  }*

  appDiagnostics:

  appMasterHost: phd40010008.na.com

  appQueue: root.default

  appMasterRpcPort: 0

  appStartTime: 1423353581140

 * yarnAppState: RUNNING*

  distributedFinalState: UNDEFINED


 But the streaming process never actually begins.  The full log is below,
 scroll to the end for the repeated warning WARN YarnClusterScheduler:
 Initial job has not accepted any resources; check your cluster UI to ensure
 that workers are registered and have sufficient memory.

 I'll note that I have a different Spark Streaming app called dqd
 working successfully for a different job that uses only a StreamingContext
 and not an additional SparkContext.  But this app (called sbStreamingTv)
 uses both a SparkContext and a StreamingContext for grabbing a lookup file
 in HDFS for IP filtering. * The references to line #198 from the log
 below refers to the val badIPs =
 sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it
 looks like Spark doesn't get beyond that point in the code.*

 Also, this job (sbStreamingTv) does work successfully using
 yarn-client, even with both a SparkContext and StreamingContext.  It looks
 to me that in yarn-cluster mode it's grabbing resources for the
 StreamingContext but not for the SparkContext.

 Any ideas?

 Jon


 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity
 1177.8 MB.
 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129
 with id = ConnectionManagerId(phd40010008.na.com,30129)
 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register
 BlockManager
 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager
 phd40010008.na.com:30129 with 1177.8 MB RAM
 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at
 http://10.229.16.108:35183
 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is
 /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO JettyUtils: Adding filter:
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at
 http://phd40010008.na.com:25869
 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to
 /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801
 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler
 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook
 for context org.apache.spark.SparkContext@7f38095d
 15/02/10 12:06:17 INFO ApplicationMaster: Registering the
 ApplicationMaster
 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors.
 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor
 containers, each with 2432 memory
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:20 INFO YarnClusterScheduler:
 YarnClusterScheduler.postStartHook done
 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir
 will be overridden by the value set by the cluster manager (via
 SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg
 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: 

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-10 Thread Jon Gregg
OK that worked and getting close here ... the job ran successfully for a
bit and I got output for the first couple buckets before getting a
java.lang.Exception: Could not compute split, block input-0-1423593163000
not found error.

So I bumped up the memory at the command line from 2 gb to 5 gb, ran it
again ... this time I got around 8 successful outputs before erroring.

Bumped up the memory from 5 gb to 10 gb ... got around 15 successful
outputs before erroring.


I'm not persisting or caching anything except for the broadcast IP table
and another broadcast small user agents list used for the same type of
filtering, and both files are tiny.  The Hadoop cluster is nearly empty
right now and has more than enough available memory to handle this job.  I
am connecting to Kafka as well and so there's a lot of data coming through
as my index is trying to catch up to the current date, but yarn-client mode
has several times in the past few weeks been able to catch up to the
current date and run successfully for days without issue.

My guess is memory isn't being cleared after each bucket?  Relevant portion
of the log below.


15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593134400 on
phd40010023.na.com:1 in memory (size: 50.1 MB, free: 10.2 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593135400 on
phd40010023.na.com:1 in memory (size: 24.9 MB, free: 10.2 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593136400 on
phd40010023.na.com:1 in memory (size: 129.0 MB, free: 10.3 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137000 on
phd40010023.na.com:1 in memory (size: 112.4 MB, free: 10.4 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137200 on
phd40010023.na.com:1 in memory (size: 481.0 B, free: 10.4 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137800 on
phd40010023.na.com:1 in memory (size: 44.6 MB, free: 10.5 GB)
15/02/10 13:34:54 INFO MappedRDD: Removing RDD 754 from persistence list
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138400 on
phd40010023.na.com:1 in memory (size: 95.8 MB, free: 10.6 GB)
15/02/10 13:34:54 INFO BlockManager: Removing RDD 754
15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 753 from persistence
list
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138600 on
phd40010023.na.com:1 in memory (size: 123.2 MB, free: 10.7 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138800 on
phd40010023.na.com:1 in memory (size: 5.2 KB, free: 10.7 GB)
15/02/10 13:34:54 INFO BlockManager: Removing RDD 753
15/02/10 13:34:54 INFO MappedRDD: Removing RDD 750 from persistence list
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139600 on
phd40010023.na.com:1 in memory (size: 106.4 MB, free: 10.8 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139800 on
phd40010023.na.com:1 in memory (size: 107.0 MB, free: 10.9 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-142359314 on
phd40010023.na.com:1 in memory (size: 59.5 MB, free: 10.9 GB)
15/02/10 13:34:54 INFO BlockManager: Removing RDD 750
15/02/10 13:34:54 INFO MappedRDD: Removing RDD 749 from persistence list
15/02/10 13:34:54 INFO DAGScheduler: Missing parents: List(Stage 117, Stage
114, Stage 115, Stage 116)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140200 on
phd40010023.na.com:1 in memory (size: 845.0 B, free: 10.9 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140600 on
phd40010023.na.com:1 in memory (size: 19.2 MB, free: 11.0 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140800 on
phd40010023.na.com:1 in memory (size: 492.0 B, free: 11.0 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593141000 on
phd40010023.na.com:1 in memory (size: 1018.0 B, free: 11.0 GB)
15/02/10 13:34:54 INFO BlockManager: Removing RDD 749
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142400 on
phd40010023.na.com:1 in memory (size: 48.6 MB, free: 11.0 GB)
15/02/10 13:34:54 INFO MappedRDD: Removing RDD 767 from persistence list
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142600 on
phd40010023.na.com:1 in memory (size: 4.9 KB, free: 11.0 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142800 on
phd40010023.na.com:1 in memory (size: 780.0 B, free: 11.0 GB)
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593143800 on
phd40010023.na.com:1 in memory (size: 847.0 B, free: 11.0 GB)
15/02/10 13:34:54 INFO BlockManager: Removing RDD 767
15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593144400 on
phd40010023.na.com:1 in memory (size: 43.7 MB, free: 11.1 GB)
15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 766 from persistence
list
15/02/10 13:34:54 INFO BlockManager: Removing RDD 766
15/02/10 13:34:54 INFO MappedRDD: 

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-06 Thread Jon Gregg
OK I tried that, but how do I convert an RDD to a Set that I can then
broadcast and cache?

  val badIPs = sc.textFile(hdfs:///user/jon/+ badfullIPs.csv)
  val badIPsLines = badIPs.getLines
  val badIpSet = badIPsLines.toSet
  val badIPsBC = sc.broadcast(badIpSet)

produces the error value getLines is not a member of
org.apache.spark.rdd.RDD[String].

Leaving it as an RDD and then constantly joining I think will be too slow
for a streaming job.

On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Jon,

 You'll need to put the file on HDFS (or whatever distributed filesystem
 you're running on) and load it from there.

 -Sandy

 On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote:

 I have a file badFullIPs.csv of bad IP addresses used for filtering.  In
 yarn-client mode, I simply read it off the edge node, transform it, and
 then
 broadcast it:

   val badIPs = fromFile(edgeDir + badfullIPs.csv)
   val badIPsLines = badIPs.getLines
   val badIpSet = badIPsLines.toSet
   val badIPsBC = sc.broadcast(badIpSet)
   badIPs.close

 How can I accomplish this in yarn-cluster mode?

 Jon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-06 Thread Sandy Ryza
You can call collect() to pull in the contents of an RDD into the driver:

  val badIPsLines = badIPs.collect()

On Fri, Feb 6, 2015 at 12:19 PM, Jon Gregg jonrgr...@gmail.com wrote:

 OK I tried that, but how do I convert an RDD to a Set that I can then
 broadcast and cache?

   val badIPs = sc.textFile(hdfs:///user/jon/+ badfullIPs.csv)
   val badIPsLines = badIPs.getLines
   val badIpSet = badIPsLines.toSet
   val badIPsBC = sc.broadcast(badIpSet)

 produces the error value getLines is not a member of
 org.apache.spark.rdd.RDD[String].

 Leaving it as an RDD and then constantly joining I think will be too slow
 for a streaming job.

 On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Jon,

 You'll need to put the file on HDFS (or whatever distributed filesystem
 you're running on) and load it from there.

 -Sandy

 On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote:

 I have a file badFullIPs.csv of bad IP addresses used for filtering.
 In
 yarn-client mode, I simply read it off the edge node, transform it, and
 then
 broadcast it:

   val badIPs = fromFile(edgeDir + badfullIPs.csv)
   val badIPsLines = badIPs.getLines
   val badIpSet = badIPsLines.toSet
   val badIPsBC = sc.broadcast(badIpSet)
   badIPs.close

 How can I accomplish this in yarn-cluster mode?

 Jon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-05 Thread YaoPau
I have a file badFullIPs.csv of bad IP addresses used for filtering.  In
yarn-client mode, I simply read it off the edge node, transform it, and then
broadcast it:

  val badIPs = fromFile(edgeDir + badfullIPs.csv)
  val badIPsLines = badIPs.getLines
  val badIpSet = badIPsLines.toSet
  val badIPsBC = sc.broadcast(badIpSet)
  badIPs.close

How can I accomplish this in yarn-cluster mode?

Jon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-05 Thread Sandy Ryza
Hi Jon,

You'll need to put the file on HDFS (or whatever distributed filesystem
you're running on) and load it from there.

-Sandy

On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote:

 I have a file badFullIPs.csv of bad IP addresses used for filtering.  In
 yarn-client mode, I simply read it off the edge node, transform it, and
 then
 broadcast it:

   val badIPs = fromFile(edgeDir + badfullIPs.csv)
   val badIPsLines = badIPs.getLines
   val badIpSet = badIPsLines.toSet
   val badIPsBC = sc.broadcast(badIpSet)
   badIPs.close

 How can I accomplish this in yarn-cluster mode?

 Jon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org