[Spark Streaming] java.lang.OutOfMemoryError: GC overhead limit exceeded
Hi guys, My Spark Streaming application have this java.lang.OutOfMemoryError: GC overhead limit exceeded error in SparkStreaming driver program. I have done the following to debug with it: 1. improved the driver memory from 1GB to 2GB, this error came after 22 hrs. When the memory was 1GB, it came after 10 hrs. So I think it is the memory leak problem. 2. after starting the application a few hours, I killed all workers. The driver program kept running and also filling up the memory. I was thinking it was because too many batches in the queue, obviously it is not. Otherwise, after killing workers (of course, the receiver), the memory usage should have gone down. 3. run the heap dump and Leak Suspect of Memory Analysis in Eclipse, found that *One instance of org.apache.spark.storage.BlockManager loaded by sun.misc.Launcher$AppClassLoader @ 0x6c002fb90 occupies 1,477,177,296 (72.70%) bytes. The memory is accumulated in one instance of java.util.LinkedHashMap loaded by system class loader.* *Keywords* *sun.misc.Launcher$AppClassLoader @ 0x6c002fb90**java.util.LinkedHashMap* *org.apache.spark.storage.BlockManager * What my application mainly does is : 1. calculate the sum/count in a batch 2. get the average in the batch 3. store the result in DB 4. calculate the sum/count in a window 5. get the average/min/max in the window 6. store the result in DB 7. compare the current batch value with previous batch value using updateStateByKey. Any hint what causes this leakage? Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
How do you debug with the logs ?
Hi guys, curious how you deal with the logs. I feel difficulty in debugging with the logs: run spark-streaming in our yarn cluster using client-mode. So have two logs: yarn log and local log ( for client ). Whenever I have problem, the log is too big to read with gedit and grep. (e.g. after running 10 hours, the local log is 1GB ). Do you use any tools to analyze/monitor/read the logs? such as logstash? Thanks, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
[Spar Streaming] How can we use consecutive data points as the features ?
Hi guys, We have a use case where we need to use consecutive data points to predict the status. (yes, like using time series data to predict the machine failure). Is there a straight-forward way to do this in Spark Streaming? If all consecutive data points are in one batch, it's not complicated except that the order of data points is not guaranteed in the batch and so I have to use the timestamp in the data point to reach my goal. However, when the consecutive data points spread in two or more batches, how can I do this? From my understanding, I need to use the state management. But it's not easy to use the updateStateByKey. e.g. I will need to update one data point and delete the oldest data point but can not do them in a batch fashion. Does anyone have similar use case in the community and how do you solve this? Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: Is there a way to get previous/other keys' state in Spark Streaming?
Thank you, TD. This is important information for us. Will keep an eye on that. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 6:54 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Yes, this is the limitation of the current implementation. But this will be improved a lt when we have IndexedRDD https://github.com/apache/spark/pull/1297 in the Spark that allows faster single value updates to a key-value (within each partition, without processing the entire partition. Soon. TD On Thu, Jul 17, 2014 at 5:57 PM, Yan Fang yanfang...@gmail.com wrote: Hi TD, Thank you. Yes, it behaves as you described. Sorry for missing this point. Then my only concern is in the performance side - since Spark Streaming operates on all the keys everytime a new batch comes, I think it is fine when the state size is small. When the state size becomes big, say, a few GBs, if we still go through the whole key list, would the operation be a little inefficient then? Maybe I miss some points in Spark Streaming, which consider this situation. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The updateFunction given in updateStateByKey should be called on ALL the keys are in the state, even if there is no new data in the batch for some key. Is that not the behavior you see? What do you mean by show all the existing states? You have access to the latest state RDD by doing stateStream.foreachRDD(...). There you can do whatever operation on all the key-state pairs. TD On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote: Hi TD, Thank you for the quick replying and backing my approach. :) 1) The example is this: 1. In the first 2 second interval, after updateStateByKey, I get a few keys and their states, say, (a - 1, b - 2, c - 3) 2. In the following 2 second interval, I only receive c and d and their value. But I want to update/display the state of a and b accordingly. * It seems I have no way to access the a and b and get their states. * also, do I have a way to show all the existing states? I guess the approach to solve this will be similar to what you mentioned for 2). But the difficulty is that, if I want to display all the existing states, need to bundle all the rest keys to one key. Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das tathagata.das1...@gmail.com wrote: For accessing previous version, I would do it the same way. :) 1. Can you elaborate on what you mean by that with an example? What do you mean by accessing keys? 2. Yeah, that is hard to do with the ability to do point lookups into an RDD, which we dont support yet. You could try embedding the related key in the values of the keys that need it. That is, B will is present in the value of key A. Then put this transformed DStream through updateStateByKey. TD
Is there a way to get previous/other keys' state in Spark Streaming?
Hi guys, sure you have similar use case and want to know how you deal with that. In our application, we want to check the previous state of some keys and compare with their current state. AFAIK, Spark Streaming does not have key-value access. So current what I am doing is storing the previous and current data as one date type in the state. Call updateStateByKey in every interval and work on the state (have previous and current data) of the generated new DStream. But it has limitations: 1. can not access keys that do appear in this time interval. 2. can not update key A's state from key B's if only key B appears in this time interval. Am I doing something wrong? Any suggestions? Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: Is there a way to get previous/other keys' state in Spark Streaming?
Hi TD, Thank you for the quick replying and backing my approach. :) 1) The example is this: 1. In the first 2 second interval, after updateStateByKey, I get a few keys and their states, say, (a - 1, b - 2, c - 3) 2. In the following 2 second interval, I only receive c and d and their value. But I want to update/display the state of a and b accordingly. * It seems I have no way to access the a and b and get their states. * also, do I have a way to show all the existing states? I guess the approach to solve this will be similar to what you mentioned for 2). But the difficulty is that, if I want to display all the existing states, need to bundle all the rest keys to one key. Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das tathagata.das1...@gmail.com wrote: For accessing previous version, I would do it the same way. :) 1. Can you elaborate on what you mean by that with an example? What do you mean by accessing keys? 2. Yeah, that is hard to do with the ability to do point lookups into an RDD, which we dont support yet. You could try embedding the related key in the values of the keys that need it. That is, B will is present in the value of key A. Then put this transformed DStream through updateStateByKey. TD
unserializable object in Spark Streaming context
Hi guys, need some help in this problem. In our use case, we need to continuously insert values into the database. So our approach is to create the jdbc object in the main method and then do the inserting operation in the DStream foreachRDD operation. Is this approach reasonable? Then the problem comes: since we are using com.mysql.jdbc.java, which is unserializable, we keep seeing the notSerializableException. I think that is because Spark Streaming is trying to serialize and then checkpoint the whole class which contains the StreamingContext, not only the StreamingContext object, right? Or other reason to trigger the serialize operation? Any workaround for this? (except not using the com.mysql.jdbc.java) Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: unserializable object in Spark Streaming context
Hi Marcelo and TD, Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB == code == SparkConf sparkConf = new SparkConf().setAppName(balababala); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); final MySQLHelper db = new MySQLHelper(); // this class contain instantiate the jdbc driver. /** /* a few DStream transformations **/ JavaPairDStreamString, MachineState noiseState = machineIdNoise .updateStateByKey(getUpdateFunction()); JavaPairDStreamString, Tuple2MachineState, Integer noiseStateTemperature = noiseState.join(machineIdTemperature); noiseStateTemperature .foreachRDD(new FunctionJavaPairRDDString, Tuple2MachineState, Integer, Void() { @Override public Void call(JavaPairRDDString, Tuple2MachineState, Integer arg0) throws Exception { ListTuple2String, Tuple2MachineState, Integer list = arg0 .collect(); for (Tuple2String, Tuple2MachineState, Integer tuple : list) { String machineId String machineState db.insertAverages(machineId, machineState); } return null; } }); end code === Thank you. If there is no other workaround, I may use TD's approach because it is the only option. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 1:54 PM, Tathagata Das tathagata.das1...@gmail.com wrote: And if Marcelo's guess is correct, then the right way to do this would be to lazily / dynamically create the jdbc connection server as a singleton in the workers/executors and use that. Something like this. dstream.foreachRDD(rdd = { rdd.foreachPartition((iterator: Iterator[...]) = { val driver = JDBCDriver.getSingleton() // this will create the single jdbc server in the worker, if it does not exist // loop through iterator to get the records in the partition and use the driver to push them out to the DB } } This will avoid the JDBC server being serialized as part of the closure / DStream checkpoint. TD On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin van...@cloudera.com wrote: Could you share some code (or pseudo-code)? Sounds like you're instantiating the JDBC connection in the driver, and using it inside a closure that would be run in a remote executor. That means that the connection object would need to be serializable. If that sounds like what you're doing, it won't work. On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, need some help in this problem. In our use case, we need to continuously insert values into the database. So our approach is to create the jdbc object in the main method and then do the inserting operation in the DStream foreachRDD operation. Is this approach reasonable? Then the problem comes: since we are using com.mysql.jdbc.java, which is unserializable, we keep seeing the notSerializableException. I think that is because Spark Streaming is trying to serialize and then checkpoint the whole class which contains the StreamingContext, not only the StreamingContext object, right? Or other reason to trigger the serialize operation? Any workaround for this? (except not using the com.mysql.jdbc.java) Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 -- Marcelo
Re: unserializable object in Spark Streaming context
Hi Sean, Thank you. I see your point. What I was thinking is that, do computation in a distributed fashion and do the storing from a single place. But you are right, having multiple DB connections actually is fine. Thanks for answering my questions. That helps me understand the system. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote: On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote: Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Connection-like objects aren't data that can be serialized. What would it mean to share one connection with N workers? that they all connect back to the driver, and through one DB connection there? this defeats the purpose of distributed computing. You want multiple DB connections. You can limit the number of partitions if needed. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent The Function you pass to foreachRDD() has a reference to db though. That's what is making it be serialized. to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB Any executors that run locally might skip the serialization and succeed (?) but I don't think the remote executors can be succeeding.
Re: Is there a way to get previous/other keys' state in Spark Streaming?
Hi TD, Thank you. Yes, it behaves as you described. Sorry for missing this point. Then my only concern is in the performance side - since Spark Streaming operates on all the keys everytime a new batch comes, I think it is fine when the state size is small. When the state size becomes big, say, a few GBs, if we still go through the whole key list, would the operation be a little inefficient then? Maybe I miss some points in Spark Streaming, which consider this situation. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The updateFunction given in updateStateByKey should be called on ALL the keys are in the state, even if there is no new data in the batch for some key. Is that not the behavior you see? What do you mean by show all the existing states? You have access to the latest state RDD by doing stateStream.foreachRDD(...). There you can do whatever operation on all the key-state pairs. TD On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote: Hi TD, Thank you for the quick replying and backing my approach. :) 1) The example is this: 1. In the first 2 second interval, after updateStateByKey, I get a few keys and their states, say, (a - 1, b - 2, c - 3) 2. In the following 2 second interval, I only receive c and d and their value. But I want to update/display the state of a and b accordingly. * It seems I have no way to access the a and b and get their states. * also, do I have a way to show all the existing states? I guess the approach to solve this will be similar to what you mentioned for 2). But the difficulty is that, if I want to display all the existing states, need to bundle all the rest keys to one key. Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das tathagata.das1...@gmail.com wrote: For accessing previous version, I would do it the same way. :) 1. Can you elaborate on what you mean by that with an example? What do you mean by accessing keys? 2. Yeah, that is hard to do with the ability to do point lookups into an RDD, which we dont support yet. You could try embedding the related key in the values of the keys that need it. That is, B will is present in the value of key A. Then put this transformed DStream through updateStateByKey. TD
Re: How are the executors used in Spark Streaming in terms of receiver and driver program?
Thank you, Tathagata. That explains. Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 7:21 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Task slot is equivalent to core number. So one core can only run one task at a time. TD On Fri, Jul 11, 2014 at 1:57 PM, Yan Fang yanfang...@gmail.com wrote: Hi Tathagata, Thank you. Is task slot equivalent to the core number? Or actually one core can run multiple tasks at the same time? Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The same executor can be used for both receiving and processing, irrespective of the deployment mode (yarn, spark standalone, etc.) It boils down to the number of cores / task slots that executor has. Each receiver is like a long running task, so each of them occupy a slot. If there are free slots in the executor then other tasks can be run on them. So if you are finding that the other tasks are being run, check how many cores/task slots the executor has and whether there are more task slots than the number of input dstream / receivers you are launching. @Praveen your answers were pretty much spot on, thanks for chipping in! On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote: Hi Praveen, Thank you for the answer. That's interesting because if I only bring up one executor for the Spark Streaming, it seems only the receiver is working, no other tasks are happening, by checking the log and UI. Maybe it's just because the receiving task eats all the resource?, not because one executor can only run one receiver? Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com wrote: Here are my answers. But am just getting started with Spark Streaming - so please correct me if am wrong. 1) Yes 2) Receivers will run on executors. Its actually a job thats submitted where # of tasks equals # of receivers. An executor can actually run more than one task at the same time. Hence you could have more number of receivers than executors but its not recommended I think. 3) As said in 2, the executor where receiver task is running can be used for map/reduce tasks. In yarn-cluster mode, the driver program is actually run as application master (lives in the first container thats launched) and this is not an executor - hence its not used for other operations. 4) the driver runs in a separate container. I think the same executor can be used for receiver and the processing task also (this part am not very sure) On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi all, I am working to improve the parallelism of the Spark Streaming application. But I have problem in understanding how the executors are used and the application is distributed. 1. In YARN, is one executor equal one container? 2. I saw the statement that a streaming receiver runs on one work machine (*n**ote that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data* ). Does the work machine mean the executor or physical machine? If I have more receivers than the executors, will it still work? 3. Is the executor that holds receiver also used for other operations, such as map, reduce, or fully occupied by the receiver? Similarly, if I run in yarn-cluster mode, is the executor running driver program used by other operations too? 4. So if I have a driver program (cluster mode) and streaming receiver, do I have to have at least 2 executors because the program and streaming receiver have to be on different executors? Thank you. Sorry for having so many questions but I do want to understand how the Spark Streaming distributes in order to assign reasonable recourse.*_* Thank you again. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: How are the executors used in Spark Streaming in terms of receiver and driver program?
Hi Praveen, Thank you for the answer. That's interesting because if I only bring up one executor for the Spark Streaming, it seems only the receiver is working, no other tasks are happening, by checking the log and UI. Maybe it's just because the receiving task eats all the resource?, not because one executor can only run one receiver? Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com wrote: Here are my answers. But am just getting started with Spark Streaming - so please correct me if am wrong. 1) Yes 2) Receivers will run on executors. Its actually a job thats submitted where # of tasks equals # of receivers. An executor can actually run more than one task at the same time. Hence you could have more number of receivers than executors but its not recommended I think. 3) As said in 2, the executor where receiver task is running can be used for map/reduce tasks. In yarn-cluster mode, the driver program is actually run as application master (lives in the first container thats launched) and this is not an executor - hence its not used for other operations. 4) the driver runs in a separate container. I think the same executor can be used for receiver and the processing task also (this part am not very sure) On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi all, I am working to improve the parallelism of the Spark Streaming application. But I have problem in understanding how the executors are used and the application is distributed. 1. In YARN, is one executor equal one container? 2. I saw the statement that a streaming receiver runs on one work machine (*n**ote that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data*). Does the work machine mean the executor or physical machine? If I have more receivers than the executors, will it still work? 3. Is the executor that holds receiver also used for other operations, such as map, reduce, or fully occupied by the receiver? Similarly, if I run in yarn-cluster mode, is the executor running driver program used by other operations too? 4. So if I have a driver program (cluster mode) and streaming receiver, do I have to have at least 2 executors because the program and streaming receiver have to be on different executors? Thank you. Sorry for having so many questions but I do want to understand how the Spark Streaming distributes in order to assign reasonable recourse.*_* Thank you again. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: How are the executors used in Spark Streaming in terms of receiver and driver program?
Hi Tathagata, Thank you. Is task slot equivalent to the core number? Or actually one core can run multiple tasks at the same time? Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The same executor can be used for both receiving and processing, irrespective of the deployment mode (yarn, spark standalone, etc.) It boils down to the number of cores / task slots that executor has. Each receiver is like a long running task, so each of them occupy a slot. If there are free slots in the executor then other tasks can be run on them. So if you are finding that the other tasks are being run, check how many cores/task slots the executor has and whether there are more task slots than the number of input dstream / receivers you are launching. @Praveen your answers were pretty much spot on, thanks for chipping in! On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote: Hi Praveen, Thank you for the answer. That's interesting because if I only bring up one executor for the Spark Streaming, it seems only the receiver is working, no other tasks are happening, by checking the log and UI. Maybe it's just because the receiving task eats all the resource?, not because one executor can only run one receiver? Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com wrote: Here are my answers. But am just getting started with Spark Streaming - so please correct me if am wrong. 1) Yes 2) Receivers will run on executors. Its actually a job thats submitted where # of tasks equals # of receivers. An executor can actually run more than one task at the same time. Hence you could have more number of receivers than executors but its not recommended I think. 3) As said in 2, the executor where receiver task is running can be used for map/reduce tasks. In yarn-cluster mode, the driver program is actually run as application master (lives in the first container thats launched) and this is not an executor - hence its not used for other operations. 4) the driver runs in a separate container. I think the same executor can be used for receiver and the processing task also (this part am not very sure) On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi all, I am working to improve the parallelism of the Spark Streaming application. But I have problem in understanding how the executors are used and the application is distributed. 1. In YARN, is one executor equal one container? 2. I saw the statement that a streaming receiver runs on one work machine (*n**ote that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data*). Does the work machine mean the executor or physical machine? If I have more receivers than the executors, will it still work? 3. Is the executor that holds receiver also used for other operations, such as map, reduce, or fully occupied by the receiver? Similarly, if I run in yarn-cluster mode, is the executor running driver program used by other operations too? 4. So if I have a driver program (cluster mode) and streaming receiver, do I have to have at least 2 executors because the program and streaming receiver have to be on different executors? Thank you. Sorry for having so many questions but I do want to understand how the Spark Streaming distributes in order to assign reasonable recourse.*_* Thank you again. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
How are the executors used in Spark Streaming in terms of receiver and driver program?
Hi all, I am working to improve the parallelism of the Spark Streaming application. But I have problem in understanding how the executors are used and the application is distributed. 1. In YARN, is one executor equal one container? 2. I saw the statement that a streaming receiver runs on one work machine ( *n**ote that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data*). Does the work machine mean the executor or physical machine? If I have more receivers than the executors, will it still work? 3. Is the executor that holds receiver also used for other operations, such as map, reduce, or fully occupied by the receiver? Similarly, if I run in yarn-cluster mode, is the executor running driver program used by other operations too? 4. So if I have a driver program (cluster mode) and streaming receiver, do I have to have at least 2 executors because the program and streaming receiver have to be on different executors? Thank you. Sorry for having so many questions but I do want to understand how the Spark Streaming distributes in order to assign reasonable recourse.*_* Thank you again. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Spark Streaming - two questions about the streamingcontext
I am using the Spark Streaming and have the following two questions: 1. If more than one output operations are put in the same StreamingContext (basically, I mean, I put all the output operations in the same class), are they processed one by one as the order they appear in the class? Or they are actually processes parallely? 2. If one DStream takes longer than the interval time, does a new DStream wait in the queue until the previous DStream is fully processed? Is there any parallelism that may process the two DStream at the same time? Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: Spark Streaming - two questions about the streamingcontext
Great. Thank you! Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Wed, Jul 9, 2014 at 11:45 AM, Tathagata Das tathagata.das1...@gmail.com wrote: 1. Multiple output operations are processed in the order they are defined. That is because by default each one output operation is processed at a time. This *can* be parallelized using an undocumented config parameter spark.streaming.concurrentJobs which is by default set to 1. 2. Yes, the output operations (and the spark jobs that are involved with them) gets queued up. TD On Wed, Jul 9, 2014 at 11:22 AM, Yan Fang yanfang...@gmail.com wrote: I am using the Spark Streaming and have the following two questions: 1. If more than one output operations are put in the same StreamingContext (basically, I mean, I put all the output operations in the same class), are they processed one by one as the order they appear in the class? Or they are actually processes parallely? 2. If one DStream takes longer than the interval time, does a new DStream wait in the queue until the previous DStream is fully processed? Is there any parallelism that may process the two DStream at the same time? Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Spark Streaming - What does Spark Streaming checkpoint?
Hi guys, I am a little confusing by the checkpointing in Spark Streaming. It checkpoints the intermediate data for the stateful operations for sure. Does it also checkpoint the information of StreamingContext? Because it seems we can recreate the SC from the checkpoint in a driver node failure scenario. When I looked at the checkpoint directory, did not find much clue. Any help? Thank you very much. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Issues in opening UI when running Spark Streaming in YARN
Hi guys, Not sure if you have similar issues. Did not find relevant tickets in JIRA. When I deploy the Spark Streaming to YARN, I have following two issues: 1. The UI port is random. It is not default 4040. I have to look at the container's log to check the UI port. Is this suppose to be this way? 2. Most of the time, the UI does not work. The difference between logs are (I ran the same program): *14/07/03 11:38:50 INFO spark.HttpServer: Starting HTTP Server14/07/03 11:38:50 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/03 11:38:50 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:12026 http://SocketConnector@0.0.0.0:1202614/07/03 11:38:51 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0 14/07/03 11:38:51 INFO executor.Executor: Running task ID 0...* 14/07/02 16:55:32 INFO spark.HttpServer: Starting HTTP Server 14/07/02 16:55:32 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/02 16:55:32 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:14211 *14/07/02 16:55:32 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter14/07/02 16:55:32 INFO server.Server: jetty-8.y.z-SNAPSHOT14/07/02 16:55:32 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:21867 http://SelectChannelConnector@0.0.0.0:21867 14/07/02 16:55:32 INFO ui.SparkUI: Started SparkUI at http://myNodeName:21867 http://myNodeName:2186714/07/02 16:55:32 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler* When the red part comes, the UI works sometime. Any ideas? Thank you. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: Issues in opening UI when running Spark Streaming in YARN
Hi Andrew, Thanks for the quick reply. It works with the yarn-client mode. One question about the yarn-cluster mode: actually I was checking the AM for the log, since the spark driver is running in the AM, the UI should also work, right? But that is not true in my case. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Mon, Jul 7, 2014 at 11:42 AM, Andrew Or and...@databricks.com wrote: I will assume that you are running in yarn-cluster mode. Because the driver is launched in one of the containers, it doesn't make sense to expose port 4040 for the node that contains the container. (Imagine if multiple driver containers are launched on the same node. This will cause a port collision). If you're launching Spark from a gateway node that is physically near your worker nodes, then you can just launch your application in yarn-client mode, in which case the SparkUI will always be started on port 4040 on the node that you ran spark-submit on. The reason why sometimes you see the red text is because it appears only on the driver containers, not the executor containers. This is because SparkUI belongs to the SparkContext, which only exists on the driver. Andrew 2014-07-07 11:20 GMT-07:00 Yan Fang yanfang...@gmail.com: Hi guys, Not sure if you have similar issues. Did not find relevant tickets in JIRA. When I deploy the Spark Streaming to YARN, I have following two issues: 1. The UI port is random. It is not default 4040. I have to look at the container's log to check the UI port. Is this suppose to be this way? 2. Most of the time, the UI does not work. The difference between logs are (I ran the same program): *14/07/03 11:38:50 INFO spark.HttpServer: Starting HTTP Server14/07/03 11:38:50 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/03 11:38:50 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:12026 http://SocketConnector@0.0.0.0:1202614/07/03 11:38:51 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0 14/07/03 11:38:51 INFO executor.Executor: Running task ID 0...* 14/07/02 16:55:32 INFO spark.HttpServer: Starting HTTP Server 14/07/02 16:55:32 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/02 16:55:32 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:14211 *14/07/02 16:55:32 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter14/07/02 16:55:32 INFO server.Server: jetty-8.y.z-SNAPSHOT14/07/02 16:55:32 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:21867 http://SelectChannelConnector@0.0.0.0:21867 14/07/02 16:55:32 INFO ui.SparkUI: Started SparkUI at http://myNodeName:21867 http://myNodeName:2186714/07/02 16:55:32 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler* When the red part comes, the UI works sometime. Any ideas? Thank you. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: Issues in opening UI when running Spark Streaming in YARN
Thank you, Andrew. That makes sense for me now. I was confused by In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster in http://spark.apache.org/docs/latest/running-on-yarn.html . After you explanation, it's clear now. Thank you. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Mon, Jul 7, 2014 at 1:07 PM, Andrew Or and...@databricks.com wrote: @Yan, the UI should still work. As long as you look into the container that launches the driver, you will find the SparkUI address and port. Note that in yarn-cluster mode the Spark driver doesn't actually run in the Application Manager; just like the executors, it runs in a container that is launched by the Resource Manager after the Application Master requests the container resources. In contrast, in yarn-client mode, your driver is not launched in a container, but in the client process that launched your application (i.e. spark-submit), so the stdout of this program directly contains the SparkUI messages. @Chester, I'm not sure what has gone wrong as there are many factors at play here. When you go the Resource Manager UI, does the application URL link point you to the same SparkUI address as indicated in the logs? If so, this is the correct behavior. However, I believe the redirect error has little to do with Spark itself, but more to do with how you set up the cluster. I have actually run into this myself, but I haven't found a workaround. Let me know if you find anything. 2014-07-07 12:07 GMT-07:00 Chester Chen ches...@alpinenow.com: As Andrew explained, the port is random rather than 4040, as the the spark driver is started in Application Master and the port is random selected. But I have the similar UI issue. I am running Yarn Cluster mode against my local CDH5 cluster. The log states 14/07/07 11:59:29 INFO ui.SparkUI: Started SparkUI at http://10.0.0.63:58750 but when you client the spark UI link (ApplicationMaster or http://10.0.0.63:58750), I will got a 404 with the redirect URI http://localhost/proxy/application_1404443455764_0010/ Looking at the Spark code, notice that the proxy is reallya variable to get the proxy at the yarn-site.xml http address. But when I specified the value at yarn-site.xml, it still doesn't work for me. Oddly enough, it works for my co-worker on Pivotal HD cluster, therefore I am still looking what's the difference in terms of cluster setup or something else. Chester On Mon, Jul 7, 2014 at 11:42 AM, Andrew Or and...@databricks.com wrote: I will assume that you are running in yarn-cluster mode. Because the driver is launched in one of the containers, it doesn't make sense to expose port 4040 for the node that contains the container. (Imagine if multiple driver containers are launched on the same node. This will cause a port collision). If you're launching Spark from a gateway node that is physically near your worker nodes, then you can just launch your application in yarn-client mode, in which case the SparkUI will always be started on port 4040 on the node that you ran spark-submit on. The reason why sometimes you see the red text is because it appears only on the driver containers, not the executor containers. This is because SparkUI belongs to the SparkContext, which only exists on the driver. Andrew 2014-07-07 11:20 GMT-07:00 Yan Fang yanfang...@gmail.com: Hi guys, Not sure if you have similar issues. Did not find relevant tickets in JIRA. When I deploy the Spark Streaming to YARN, I have following two issues: 1. The UI port is random. It is not default 4040. I have to look at the container's log to check the UI port. Is this suppose to be this way? 2. Most of the time, the UI does not work. The difference between logs are (I ran the same program): *14/07/03 11:38:50 INFO spark.HttpServer: Starting HTTP Server14/07/03 11:38:50 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/03 11:38:50 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:12026 http://SocketConnector@0.0.0.0:1202614/07/03 11:38:51 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0 14/07/03 11:38:51 INFO executor.Executor: Running task ID 0...* 14/07/02 16:55:32 INFO spark.HttpServer: Starting HTTP Server 14/07/02 16:55:32 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/02 16:55:32 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:14211 *14/07/02 16:55:32 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter14/07/02 16:55:32 INFO server.Server: jetty-8.y.z-SNAPSHOT14/07/02 16:55:32 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:21867 http://SelectChannelConnector@0.0.0.0:21867 14/07/02 16:55:32 INFO ui.SparkUI: Started SparkUI at http://myNodeName:21867 http://myNodeName:2186714/07/02 16:55:32 INFO cluster.YarnClusterScheduler: Created
Is the order of messages guaranteed in a DStream?
I know the order of processing DStream is guaranteed. Wondering if the order of messages in one DStream is guaranteed. My gut feeling is yes for the question because RDD is immutable. Some simple tests prove this. Want to hear from authority to persuade myself. Thank you. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108