Re: Spark Streaming: custom actor receiver losing vast majority of data
Hi Shixiong, Just figured it out. I was doing a .print() as output operation, which seems to stop the batch once it has 10 through. I changed it to a no-op foreachRDD and it works. Thanks for jumping in to help me. From: "Shixiong(Ryan) Zhu" mailto:shixi...@databricks.com>> Date: Thursday, January 14, 2016 at 4:41 PM To: Lin Zhao mailto:l...@exabeam.com>> Cc: user mailto:user@spark.apache.org>> Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data Could you post the codes of MessageRetriever? And by the way, could you post the screenshot of tasks for a batch and check the input size of these tasks? Considering there are so many events, there should be a lot of blocks as well as a lot of tasks. On Thu, Jan 14, 2016 at 4:34 PM, Lin Zhao mailto:l...@exabeam.com>> wrote: Hi Shixiong, I tried this but it still happens. If it helps, it's 1.6.0 and runs on YARN. Batch duration is 20 seconds. Some logs seemingly related to block manager: 16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block input-0-1452817873000 16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB) 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25 16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block input-0-1452817879000 16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32 16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" module=MESSAGE_RETRIEVER 16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB) 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39 From: "Shixiong(Ryan) Zhu" mailto:shixi...@databricks.com>> Date: Thursday, January 14, 2016 at 4:13 PM To: Lin Zhao mailto:l...@exabeam.com>> Cc: user mailto:user@spark.apache.org>> Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data MEMORY_AND_DISK_SER_2
Re: Spark Streaming: custom actor receiver losing vast majority of data
Could you post the codes of MessageRetriever? And by the way, could you post the screenshot of tasks for a batch and check the input size of these tasks? Considering there are so many events, there should be a lot of blocks as well as a lot of tasks. On Thu, Jan 14, 2016 at 4:34 PM, Lin Zhao wrote: > Hi Shixiong, > > I tried this but it still happens. If it helps, it's 1.6.0 and runs on > YARN. Batch duration is 20 seconds. > > Some logs seemingly related to block manager: > > 16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block > input-0-1452817873000 > 16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 > stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB) > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25 > 16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block > input-0-1452817879000 > 16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read > [win] message file(s) for 2015-12-17T21:00:00.000." > module=TIMESPAN_HDFS_READER > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33 > 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32 > 16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 > lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" > module=MESSAGE_RETRIEVER > 16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 > stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB) > 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45 > 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44 > 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43 > 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42 > 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41 > 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40 > 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39 > > > From: "Shixiong(Ryan) Zhu" > Date: Thursday, January 14, 2016 at 4:13 PM > To: Lin Zhao > Cc: user > Subject: Re: Spark Streaming: custom actor receiver losing vast majority > of data > > MEMORY_AND_DISK_SER_2 >
Re: Spark Streaming: custom actor receiver losing vast majority of data
Hi Shixiong, I tried this but it still happens. If it helps, it's 1.6.0 and runs on YARN. Batch duration is 20 seconds. Some logs seemingly related to block manager: 16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block input-0-1452817873000 16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB) 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25 16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block input-0-1452817879000 16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32 16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" module=MESSAGE_RETRIEVER 16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB) 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39 From: "Shixiong(Ryan) Zhu" mailto:shixi...@databricks.com>> Date: Thursday, January 14, 2016 at 4:13 PM To: Lin Zhao mailto:l...@exabeam.com>> Cc: user mailto:user@spark.apache.org>> Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data MEMORY_AND_DISK_SER_2
Re: Spark Streaming: custom actor receiver losing vast majority of data
Could you change MEMORY_ONLY_SER to MEMORY_AND_DISK_SER_2 and see if this still happens? It may be because you don't have enough memory to cache the events. On Thu, Jan 14, 2016 at 4:06 PM, Lin Zhao wrote: > Hi, > > I'm testing spark streaming with actor receiver. The actor keeps calling > store() to save a pair to Spark. > > Once the job is launched, on the UI everything looks good. Millions of > events gets through every batch. However, I added logging to the first step > and found that only 20 or 40 events in a batch actually gets to the first > mapper. Any idea what might be causing this? > > I also have log in the custom receiver before "store()" call and it's > really calling this function millions of times. > > The receiver definition looks like: > > val stream = ssc.actorStream[(String, > Message)](MessageRetriever.props("message-retriever", > mrSections.head, conf, flowControlDef, None, None), "Martini", > StorageLevel.MEMORY_ONLY_SER) > > > The job looks like: > > stream.map { pair => > logger.info(s"before pipeline key=${pair._1}") // Only a handful gets > logged although there are over 1 million in a batch > pair._2 > }.flatMap { m => > // Event Builder > logger.info(s"event builder thread-id=${Thread.currentThread().getId} > user=${m.fields.getOrElse('user, "NA")}") > ebHelper(m) > }.map { e => > // Event Normalizer > logger.info(s"normalizer thread-id=${Thread.currentThread().getId} > user=${e.getFieldAsString('user)}") > DefaultEventNormalizer.normalizeFields(e) > }.map { e => > logger.info(s"resolver thread-id=${Thread.currentThread().getId} > user=${e.getFieldAsString('user)}") > resolver(e) > }.flatMap { e => > // Event Discarder > logger.info(s"discarder thread-id=${Thread.currentThread().getId} > user=${e.getFieldAsString('user)}") > discarder(e) > }.map { e => > ep(e) > } > >
Spark Streaming: custom actor receiver losing vast majority of data
Hi, I'm testing spark streaming with actor receiver. The actor keeps calling store() to save a pair to Spark. Once the job is launched, on the UI everything looks good. Millions of events gets through every batch. However, I added logging to the first step and found that only 20 or 40 events in a batch actually gets to the first mapper. Any idea what might be causing this? I also have log in the custom receiver before "store()" call and it's really calling this function millions of times. The receiver definition looks like: val stream = ssc.actorStream[(String, Message)](MessageRetriever.props("message-retriever", mrSections.head, conf, flowControlDef, None, None), "Martini", StorageLevel.MEMORY_ONLY_SER) The job looks like: stream.map { pair => logger.info(s"before pipeline key=${pair._1}") // Only a handful gets logged although there are over 1 million in a batch pair._2 }.flatMap { m => // Event Builder logger.info(s"event builder thread-id=${Thread.currentThread().getId} user=${m.fields.getOrElse('user, "NA")}") ebHelper(m) }.map { e => // Event Normalizer logger.info(s"normalizer thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}") DefaultEventNormalizer.normalizeFields(e) }.map { e => logger.info(s"resolver thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}") resolver(e) }.flatMap { e => // Event Discarder logger.info(s"discarder thread-id=${Thread.currentThread().getId} user=${e.getFieldAsString('user)}") discarder(e) }.map { e => ep(e) }