I tried to file a bug in git repo however I don't see a link to "open issues"
On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia <mohitanch...@gmail.com> wrote: > I checked the ports using netstat and don't see any connections > established on that port. Logs show only this: > > 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount > 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID > app-20150327135048-0002 > > Spark ui shows: > > Running Applications > IDNameCoresMemory per NodeSubmitted TimeUserStateDuration > app-20150327135048-0002 > <http://54.69.225.94:8080/app?appId=app-20150327135048-0002> > NetworkWordCount > <http://ip-10-241-251-232.us-west-2.compute.internal:4040/>0512.0 MB2015/03/27 > 13:50:48ec2-userWAITING33 s > Code looks like is being executed: > > java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 > > *public* *static* *void* doWork(String masterUrl){ > > SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( > "NetworkWordCount"); > > JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations. > *seconds*(1)); > > JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", > 9999); > > System.*out*.println("Successfully created connection"); > > *mapAndReduce*(lines); > > jssc.start(); // Start the computation > > jssc.awaitTermination(); // Wait for the computation to terminate > > } > > *public* *static* *void* main(String ...args){ > > *doWork*(args[0]); > > } > And output of the java program after submitting the task: > > java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user > 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user > 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(ec2-user); > users with modify permissions: Set(ec2-user) > 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started > 15/03/27 13:50:46 INFO Remoting: Starting remoting > 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal > :60184] > 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' > on port 60184. > 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker > 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster > 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at > /tmp/spark-local-20150327135047-5399 > 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 > GB > 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is > /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b > 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server > 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file > server' on port 57955. > 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on > port 4040. > 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at > http://ip-10-241-251-232.us-west-2.compute.internal:4040 > 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master > spark://ip-10-241-251-232:7077... > 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark > cluster with app ID app-20150327135048-0002 > 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358 > 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager > 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager > ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM, > BlockManagerId(<driver>, ip-10-241-251-232.us-west-2.compute.internal, > 58358) > 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager > 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is > ready for scheduling beginning after reached minRegisteredResourcesRatio: > 0.0 > 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started > 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1 > 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1 > 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1 > 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1 > 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1 > 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms > 15/03/27 13:50:48 INFO SocketInputDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null > 15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms > 15/03/27 13:50:48 INFO SocketInputDStream: Initialized and validated > org.apache.spark.streaming.dstream.SocketInputDStream@75efa13d > 15/03/27 13:50:48 INFO FlatMappedDStream: Slide time = 1000 ms > 15/03/27 13:50:48 INFO FlatMappedDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 15/03/27 13:50:48 INFO FlatMappedDStream: Checkpoint interval = null > 15/03/27 13:50:48 INFO FlatMappedDStream: Remember duration = 1000 ms > 15/03/27 13:50:48 INFO FlatMappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.FlatMappedDStream@65ce9dc5 > 15/03/27 13:50:48 INFO MappedDStream: Slide time = 1000 ms > 15/03/27 13:50:48 INFO MappedDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/27 13:50:48 INFO MappedDStream: Checkpoint interval = null > 15/03/27 13:50:48 INFO MappedDStream: Remember duration = 1000 ms > 15/03/27 13:50:48 INFO MappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.MappedDStream@5ae2740f > 15/03/27 13:50:48 INFO ShuffledDStream: Slide time = 1000 ms > 15/03/27 13:50:48 INFO ShuffledDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 15/03/27 13:50:48 INFO ShuffledDStream: Checkpoint interval = null > 15/03/27 13:50:48 INFO ShuffledDStream: Remember duration = 1000 ms > 15/03/27 13:50:48 INFO ShuffledDStream: Initialized and validated > org.apache.spark.streaming.dstream.ShuffledDStream@4931b366 > 15/03/27 13:50:48 INFO ForEachDStream: Slide time = 1000 ms > 15/03/27 13:50:48 INFO ForEachDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/27 13:50:48 INFO ForEachDStream: Checkpoint interval = null > 15/03/27 13:50:48 INFO ForEachDStream: Remember duration = 1000 ms > 15/03/27 13:50:48 INFO ForEachDStream: Initialized and validated > org.apache.spark.streaming.dstream.ForEachDStream@5df91314 > 15/03/27 13:50:48 INFO SparkContext: Starting job: start at > WordCount.java:26 > 15/03/27 13:50:48 INFO RecurringTimer: Started timer for JobGenerator at > time 1427478649000 > 15/03/27 13:50:48 INFO JobGenerator: Started JobGenerator at 1427478649000 > ms > 15/03/27 13:50:48 INFO JobScheduler: Started JobScheduler > 15/03/27 13:50:48 INFO DAGScheduler: Registering RDD 2 (start at > WordCount.java:26) > 15/03/27 13:50:48 INFO DAGScheduler: Got job 0 (start at > WordCount.java:26) with 20 output partitions (allowLocal=false) > 15/03/27 13:50:48 INFO DAGScheduler: Final stage: Stage 1(start at > WordCount.java:26) > 15/03/27 13:50:48 INFO DAGScheduler: Parents of final stage: List(Stage 0) > 15/03/27 13:50:48 INFO DAGScheduler: Missing parents: List(Stage 0) > 15/03/27 13:50:48 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at > start at WordCount.java:26), which has no missing parents > 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(2720) called with > curMem=0, maxMem=3771948072 > 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 2.7 KB, free 3.5 GB) > 15/03/27 13:50:48 INFO MemoryStore: ensureFreeSpace(1943) called with > curMem=2720, maxMem=3771948072 > 15/03/27 13:50:48 INFO MemoryStore: Block broadcast_0_piece0 stored as > bytes in memory (estimated size 1943.0 B, free 3.5 GB) > 15/03/27 13:50:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in > memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1943.0 > B, free: 3.5 GB) > 15/03/27 13:50:48 INFO BlockManagerMaster: Updated info of block > broadcast_0_piece0 > 15/03/27 13:50:48 INFO SparkContext: Created broadcast 0 from broadcast at > DAGScheduler.scala:838 > 15/03/27 13:50:48 INFO DAGScheduler: Submitting 50 missing tasks from > Stage 0 (MappedRDD[2] at start at WordCount.java:26) > 15/03/27 13:50:48 INFO TaskSchedulerImpl: Adding task set 0.0 with 50 tasks > 15/03/27 13:50:49 INFO JobScheduler: Added jobs for time 1427478649000 ms > 15/03/27 13:50:49 INFO JobScheduler: Starting job streaming job > 1427478649000 ms.0 from job set of time 1427478649000 ms > 15/03/27 13:50:49 INFO SparkContext: Starting job: print at > WordCount.java:53 > 15/03/27 13:50:49 INFO DAGScheduler: Registering RDD 6 (mapToPair at > WordCount.java:39) > 15/03/27 13:50:49 INFO DAGScheduler: Got job 1 (print at > WordCount.java:53) with 1 output partitions (allowLocal=true) > 15/03/27 13:50:49 INFO DAGScheduler: Final stage: Stage 3(print at > WordCount.java:53) > 15/03/27 13:50:49 INFO DAGScheduler: Parents of final stage: List(Stage 2) > 15/03/27 13:50:49 INFO DAGScheduler: Missing parents: List() > 15/03/27 13:50:49 INFO DAGScheduler: Submitting Stage 3 (ShuffledRDD[7] at > reduceByKey at WordCount.java:46), which has no missing parents > 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(2264) called with > curMem=4663, maxMem=3771948072 > 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 2.2 KB, free 3.5 GB) > 15/03/27 13:50:49 INFO MemoryStore: ensureFreeSpace(1688) called with > curMem=6927, maxMem=3771948072 > 15/03/27 13:50:49 INFO MemoryStore: Block broadcast_1_piece0 stored as > bytes in memory (estimated size 1688.0 B, free 3.5 GB) > 15/03/27 13:50:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in > memory on ip-10-241-251-232.us-west-2.compute.internal:58358 (size: 1688.0 > B, free: 3.5 GB) > 15/03/27 13:50:49 INFO BlockManagerMaster: Updated info of block > broadcast_1_piece0 > 15/03/27 13:50:49 INFO SparkContext: Created broadcast 1 from broadcast at > DAGScheduler.scala:838 > 15/03/27 13:50:49 INFO DAGScheduler: Submitting 1 missing tasks from Stage > 3 (ShuffledRDD[7] at reduceByKey at WordCount.java:46) > 15/03/27 13:50:49 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks > 15/03/27 13:50:50 INFO JobScheduler: Added jobs for time 1427478650000 ms > 15/03/27 13:50:51 INFO JobScheduler: Added jobs for time 1427478651000 ms > 15/03/27 13:50:52 INFO JobScheduler: Added jobs for time 1427478652000 ms > 15/03/27 13:50:53 IN > > > > On Thu, Mar 26, 2015 at 6:50 PM, Saisai Shao <sai.sai.s...@gmail.com> > wrote: > >> Hi, >> >> Did you run the word count example in Spark local mode or other mode, in >> local mode you have to set Local[n], where n >=2. For other mode, make sure >> available cores larger than 1. Because the receiver inside Spark Streaming >> wraps as a long-running task, which will at least occupy one core. >> >> Besides using lsof -p <pid> or netstat to make sure Spark executor >> backend is connected to the nc process. Also grep the executor's log to see >> if there's log like "Connecting to <host> <port>" and "Connected to <host> >> <port>" which shows that receiver is correctly connected to nc process. >> >> Thanks >> Jerry >> >> 2015-03-27 8:45 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>: >> >>> What's the best way to troubleshoot inside spark to see why Spark is not >>> connecting to nc on port 9999? I don't see any errors either. >>> >>> On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia <mohitanch...@gmail.com> >>> wrote: >>> >>>> I am trying to run the word count example but for some reason it's not >>>> working as expected. I start "nc" server on port 9999 and then submit the >>>> spark job to the cluster. Spark job gets successfully submitting but I >>>> never see any connection from spark getting established. I also tried to >>>> type words on the console where "nc" is listening and waiting on the >>>> prompt, however I don't see any output. I also don't see any errors. >>>> >>>> Here is the conf: >>>> >>>> SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( >>>> "NetworkWordCount"); >>>> >>>> JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, >>>> Durations.*seconds*(1)); >>>> >>>> JavaReceiverInputDStream<String> lines = jssc.socketTextStream( >>>> "localhost", 9999); >>>> >>> >>> >> >