[Spark Streaming] java.lang.OutOfMemoryError: GC overhead limit exceeded

2014-09-08 Thread Yan Fang
Hi guys,

My Spark Streaming application have this java.lang.OutOfMemoryError: GC
overhead limit exceeded error in SparkStreaming driver program. I have
done the following to debug with it:

1. improved the driver memory from 1GB to 2GB, this error came after 22
hrs. When the memory was 1GB, it came after 10 hrs. So I think it is the
memory leak problem.

2. after starting the application a few hours, I killed all workers. The
driver program kept running and also filling up the memory. I was thinking
it was because too many batches in the queue, obviously it is not.
Otherwise, after killing workers (of course, the receiver), the memory
usage should have gone down.

3. run the heap dump and Leak Suspect of Memory Analysis in Eclipse, found
that

*One instance of org.apache.spark.storage.BlockManager loaded by
sun.misc.Launcher$AppClassLoader @ 0x6c002fb90 occupies 1,477,177,296
(72.70%) bytes. The memory is accumulated in one instance of
java.util.LinkedHashMap loaded by system class loader.*

*Keywords*
*sun.misc.Launcher$AppClassLoader @ 0x6c002fb90**java.util.LinkedHashMap*
*org.apache.spark.storage.BlockManager *



What my application mainly does is :

1. calculate the sum/count in a batch
2. get the average in the batch
3. store the result in DB

4. calculate the sum/count in a window
5. get the average/min/max in the window
6. store the result in DB

7. compare the current batch value with previous batch value using
updateStateByKey.


Any hint what causes this leakage? Thank you.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


How do you debug with the logs ?

2014-09-03 Thread Yan Fang
Hi guys,

curious how you deal with the logs. I feel difficulty in debugging with the
logs: run spark-streaming in our yarn cluster using client-mode. So have
two logs: yarn log and local log ( for client ). Whenever I have problem,
 the log is too big to read with gedit and grep. (e.g. after running 10
hours, the local log is 1GB ). Do you use any tools to analyze/monitor/read
the logs? such as logstash?

Thanks,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


[Spar Streaming] How can we use consecutive data points as the features ?

2014-08-15 Thread Yan Fang
Hi guys,

We have a use case where we need to use consecutive data points to predict
the status. (yes, like using time series data to predict the machine
failure). Is there a straight-forward way to do this in Spark Streaming?

If all consecutive data points are in one batch, it's not complicated
except that the order of data points is not guaranteed in the batch and so
I have to use the timestamp in the data point to reach my goal. However,
when the consecutive data points spread in two or more batches, how can I
do this? From my understanding, I need to use the state management. But
it's not easy to use the updateStateByKey. e.g. I will need to update one
data point and delete the oldest data point but can not do them in a batch
fashion.

Does anyone have similar use case in the community and how do you solve
this? Thank you.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


Re: Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-18 Thread Yan Fang
Thank you, TD. This is important information for us. Will keep an eye on
that.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 6:54 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Yes, this is the limitation of the current implementation. But this will
 be improved a lt when we have IndexedRDD
 https://github.com/apache/spark/pull/1297 in the Spark that allows
 faster single value updates to a key-value (within each partition, without
 processing the entire partition.

 Soon.

 TD


 On Thu, Jul 17, 2014 at 5:57 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi TD,

 Thank you. Yes, it behaves as you described. Sorry for missing this
 point.

 Then my only concern is in the performance side - since Spark Streaming
 operates on all the keys everytime a new batch comes, I think it is fine
 when the state size is small. When the state size becomes big, say, a few
 GBs, if we still go through the whole key list, would the operation be a
 little inefficient then? Maybe I miss some points in Spark Streaming, which
 consider this situation.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 The updateFunction given in updateStateByKey should be called on ALL the
 keys are in the state, even if there is no new data in the batch for some
 key. Is that not the behavior you see?

 What do you mean by show all the existing states? You have access to
 the latest state RDD by doing stateStream.foreachRDD(...). There you can do
 whatever operation on all the key-state pairs.

 TD




 On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi TD,

 Thank you for the quick replying and backing my approach. :)

 1) The example is this:

 1. In the first 2 second interval, after updateStateByKey, I get a few
 keys and their states, say, (a - 1, b - 2, c - 3)
 2. In the following 2 second interval, I only receive c and d and
 their value. But I want to update/display the state of a and b
 accordingly.
 * It seems I have no way to access the a and b and get their
 states.
 * also, do I have a way to show all the existing states?

 I guess the approach to solve this will be similar to what you
 mentioned for 2). But the difficulty is that, if I want to display all the
 existing states, need to bundle all the rest keys to one key.

 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 For accessing previous version, I would do it the same way. :)

 1. Can you elaborate on what you mean by that with an example? What do
 you mean by accessing keys?

 2. Yeah, that is hard to do with the ability to do point lookups into
 an RDD, which we dont support yet. You could try embedding the related key
 in the values of the keys that need it. That is, B will is present in the
 value of key A. Then put this transformed DStream through 
 updateStateByKey.

 TD








Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-17 Thread Yan Fang
Hi guys,

sure you have similar use case and want to know how you deal with that. In
our application, we want to check the previous state of some keys and
compare with their current state.

AFAIK, Spark Streaming does not have key-value access. So current what I am
doing is storing the previous and current data as one date type in the
state. Call updateStateByKey in every interval and work on the state (have
previous and current data)  of the generated new DStream. But it has
limitations:

1. can not access keys that do appear in this time interval.
2. can not update key A's state from key B's if only key B appears in this
time interval.

Am I doing something wrong? Any suggestions? Thank you.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


Re: Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-17 Thread Yan Fang
Hi TD,

Thank you for the quick replying and backing my approach. :)

1) The example is this:

1. In the first 2 second interval, after updateStateByKey, I get a few keys
and their states, say, (a - 1, b - 2, c - 3)
2. In the following 2 second interval, I only receive c and d and their
value. But I want to update/display the state of a and b accordingly.
* It seems I have no way to access the a and b and get their states.
* also, do I have a way to show all the existing states?

I guess the approach to solve this will be similar to what you mentioned
for 2). But the difficulty is that, if I want to display all the existing
states, need to bundle all the rest keys to one key.

Thank you.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 For accessing previous version, I would do it the same way. :)

 1. Can you elaborate on what you mean by that with an example? What do you
 mean by accessing keys?

 2. Yeah, that is hard to do with the ability to do point lookups into an
 RDD, which we dont support yet. You could try embedding the related key in
 the values of the keys that need it. That is, B will is present in the
 value of key A. Then put this transformed DStream through updateStateByKey.

 TD



unserializable object in Spark Streaming context

2014-07-17 Thread Yan Fang
Hi guys,

need some help in this problem. In our use case, we need to continuously
insert values into the database. So our approach is to create the jdbc
object in the main method and then do the inserting operation in the
DStream foreachRDD operation. Is this approach reasonable?

Then the problem comes: since we are using com.mysql.jdbc.java, which is
unserializable, we keep seeing the notSerializableException. I think that
is because Spark Streaming is trying to serialize and then checkpoint the
whole class which contains the StreamingContext, not only the
StreamingContext object, right? Or other reason to trigger the serialize
operation? Any workaround for this? (except not using the
com.mysql.jdbc.java)

Thank you.

Cheers,
Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


Re: unserializable object in Spark Streaming context

2014-07-17 Thread Yan Fang
Hi Marcelo and TD,

Thank you for the help. If I use TD's approache, it works and there is no
exception. Only drawback is that it will create many connections to the DB,
which I was trying to avoid.

Here is a snapshot of my code. Mark as red for the important code. What I
was thinking is that, if I call the collect() method, Spark Streaming will
bring the data to the driver and then the db object does not need to be
sent to executors. My observation is that, thought exceptions are thrown,
the insert function still works. Any thought about that? Also paste the log
in case it helps .http://pastebin.com/T1bYvLWB

== code ==

   SparkConf sparkConf = new SparkConf().setAppName(balababala);
   JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(2000));

final MySQLHelper db = new MySQLHelper();  // this class contain
instantiate the jdbc driver.

   /**
   /* a few DStream transformations
   **/

JavaPairDStreamString, MachineState noiseState = machineIdNoise
.updateStateByKey(getUpdateFunction());

JavaPairDStreamString, Tuple2MachineState, Integer
noiseStateTemperature = noiseState.join(machineIdTemperature);

noiseStateTemperature
.foreachRDD(new FunctionJavaPairRDDString, Tuple2MachineState,
Integer, Void() {
@Override
public Void call(JavaPairRDDString, Tuple2MachineState,
Integer arg0)
throws Exception {
ListTuple2String, Tuple2MachineState, Integer list =
arg0
.collect();
for (Tuple2String, Tuple2MachineState, Integer tuple :
list) {
String machineId
String machineState
db.insertAverages(machineId, machineState);
}
return null;
}
});

 end code ===

Thank you. If there is no other workaround, I may use TD's approach because
it is the only option.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 1:54 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 And if Marcelo's guess is correct, then the right way to do this would be
 to lazily  / dynamically create the jdbc connection server as a singleton
 in the workers/executors and use that. Something like this.


 dstream.foreachRDD(rdd = {
rdd.foreachPartition((iterator: Iterator[...]) = {
val driver = JDBCDriver.getSingleton()   // this will create the
 single jdbc server in the worker, if it does not exist
// loop through iterator to get the records in the partition and
 use the driver to push them out to the DB
}
 }

 This will avoid the JDBC server being serialized as part of the closure /
 DStream checkpoint.

 TD


 On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Could you share some code (or pseudo-code)?

 Sounds like you're instantiating the JDBC connection in the driver,
 and using it inside a closure that would be run in a remote executor.
 That means that the connection object would need to be serializable.
 If that sounds like what you're doing, it won't work.


 On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote:
  Hi guys,
 
  need some help in this problem. In our use case, we need to continuously
  insert values into the database. So our approach is to create the jdbc
  object in the main method and then do the inserting operation in the
 DStream
  foreachRDD operation. Is this approach reasonable?
 
  Then the problem comes: since we are using com.mysql.jdbc.java, which is
  unserializable, we keep seeing the notSerializableException. I think
 that is
  because Spark Streaming is trying to serialize and then checkpoint the
 whole
  class which contains the StreamingContext, not only the StreamingContext
  object, right? Or other reason to trigger the serialize operation? Any
  workaround for this? (except not using the com.mysql.jdbc.java)
 
  Thank you.
 
  Cheers,
  Fang, Yan
  yanfang...@gmail.com
  +1 (206) 849-4108



 --
 Marcelo





Re: unserializable object in Spark Streaming context

2014-07-17 Thread Yan Fang
Hi Sean,

Thank you. I see your point. What I was thinking is that, do computation in
a distributed fashion and do the storing from a single place. But you are
right, having multiple DB connections actually is fine.

Thanks for answering my questions. That helps me understand the system.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote:

 On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote:
  Thank you for the help. If I use TD's approache, it works and there is no
  exception. Only drawback is that it will create many connections to the
 DB,
  which I was trying to avoid.

 Connection-like objects aren't data that can be serialized. What would
 it mean to share one connection with N workers? that they all connect
 back to the driver, and through one DB connection there? this defeats
 the purpose of distributed computing. You want multiple DB
 connections. You can limit the number of partitions if needed.


  Here is a snapshot of my code. Mark as red for the important code. What I
  was thinking is that, if I call the collect() method, Spark Streaming
 will
  bring the data to the driver and then the db object does not need to be
 sent

 The Function you pass to foreachRDD() has a reference to db though.
 That's what is making it be serialized.

  to executors. My observation is that, thought exceptions are thrown, the
  insert function still works. Any thought about that? Also paste the log
 in
  case it helps .http://pastebin.com/T1bYvLWB

 Any executors that run locally might skip the serialization and
 succeed (?) but I don't think the remote executors can be succeeding.



Re: Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-17 Thread Yan Fang
Hi TD,

Thank you. Yes, it behaves as you described. Sorry for missing this point.

Then my only concern is in the performance side - since Spark Streaming
operates on all the keys everytime a new batch comes, I think it is fine
when the state size is small. When the state size becomes big, say, a few
GBs, if we still go through the whole key list, would the operation be a
little inefficient then? Maybe I miss some points in Spark Streaming, which
consider this situation.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 The updateFunction given in updateStateByKey should be called on ALL the
 keys are in the state, even if there is no new data in the batch for some
 key. Is that not the behavior you see?

 What do you mean by show all the existing states? You have access to the
 latest state RDD by doing stateStream.foreachRDD(...). There you can do
 whatever operation on all the key-state pairs.

 TD




 On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi TD,

 Thank you for the quick replying and backing my approach. :)

 1) The example is this:

 1. In the first 2 second interval, after updateStateByKey, I get a few
 keys and their states, say, (a - 1, b - 2, c - 3)
 2. In the following 2 second interval, I only receive c and d and
 their value. But I want to update/display the state of a and b
 accordingly.
 * It seems I have no way to access the a and b and get their
 states.
 * also, do I have a way to show all the existing states?

 I guess the approach to solve this will be similar to what you mentioned
 for 2). But the difficulty is that, if I want to display all the existing
 states, need to bundle all the rest keys to one key.

 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 For accessing previous version, I would do it the same way. :)

 1. Can you elaborate on what you mean by that with an example? What do
 you mean by accessing keys?

 2. Yeah, that is hard to do with the ability to do point lookups into an
 RDD, which we dont support yet. You could try embedding the related key in
 the values of the keys that need it. That is, B will is present in the
 value of key A. Then put this transformed DStream through updateStateByKey.

 TD






Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-12 Thread Yan Fang
Thank you, Tathagata. That explains.

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Fri, Jul 11, 2014 at 7:21 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Task slot is equivalent to core number. So one core can only run one task
 at a time.

 TD


 On Fri, Jul 11, 2014 at 1:57 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi Tathagata,

 Thank you. Is task slot equivalent to the core number? Or actually one
 core can run multiple tasks at the same time?

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 The same executor can be used for both receiving and processing,
 irrespective of the deployment mode (yarn, spark standalone, etc.) It boils
 down to the number of cores / task slots that executor has. Each receiver
 is like a long running task, so each of them occupy a slot. If there are
 free slots in the executor then other tasks can be run on them.

 So if you are finding that the other tasks are being run, check how many
 cores/task slots the executor has and whether there are more task slots
 than the number of input dstream / receivers you are launching.

 @Praveen  your answers were pretty much spot on, thanks for chipping in!




 On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi Praveen,

 Thank you for the answer. That's interesting because if I only bring up
 one executor for the Spark Streaming, it seems only the receiver is
 working, no other tasks are happening, by checking the log and UI. Maybe
 it's just because the receiving task eats all the resource?, not because
 one executor can only run one receiver?

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com
 wrote:

 Here are my answers. But am just getting started with Spark Streaming
 - so please correct me if am wrong.
 1) Yes
 2) Receivers will run on executors. Its actually a job thats submitted
 where # of tasks equals # of receivers. An executor can actually run more
 than one task at the same time. Hence you could have more number of
 receivers than executors but its not recommended I think.
 3) As said in 2, the executor where receiver task is running can be
 used for map/reduce tasks. In yarn-cluster mode, the driver program is
 actually run as application master (lives in the first container thats
 launched) and this is not an executor - hence its not used for other
 operations.
 4) the driver runs in a separate container. I think the same executor
 can be used for receiver and the processing task also (this part am not
 very sure)


  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com
 wrote:

 Hi all,

 I am working to improve the parallelism of the Spark Streaming
 application. But I have problem in understanding how the executors are 
 used
 and the application is distributed.

 1. In YARN, is one executor equal one container?

 2. I saw the statement that a streaming receiver runs on one work
 machine (*n**ote that each input DStream creates a single receiver
 (running on a worker machine) that receives a single stream of data*
 ). Does the work machine mean the executor or physical machine? If
 I have more receivers than the executors, will it still work?

 3. Is the executor that holds receiver also used for other
 operations, such as map, reduce, or fully occupied by the receiver?
 Similarly, if I run in yarn-cluster mode, is the executor running driver
 program used by other operations too?

 4. So if I have a driver program (cluster mode) and streaming
 receiver, do I have to have at least 2 executors because the program and
 streaming receiver have to be on different executors?

 Thank you. Sorry for having so many questions but I do want to
 understand how the Spark Streaming distributes in order to assign
 reasonable recourse.*_* Thank you again.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108









Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-11 Thread Yan Fang
Hi Praveen,

Thank you for the answer. That's interesting because if I only bring up one
executor for the Spark Streaming, it seems only the receiver is working, no
other tasks are happening, by checking the log and UI. Maybe it's just
because the receiving task eats all the resource?, not because one executor
can only run one receiver?

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com wrote:

 Here are my answers. But am just getting started with Spark Streaming - so
 please correct me if am wrong.
 1) Yes
 2) Receivers will run on executors. Its actually a job thats submitted
 where # of tasks equals # of receivers. An executor can actually run more
 than one task at the same time. Hence you could have more number of
 receivers than executors but its not recommended I think.
 3) As said in 2, the executor where receiver task is running can be used
 for map/reduce tasks. In yarn-cluster mode, the driver program is actually
 run as application master (lives in the first container thats launched) and
 this is not an executor - hence its not used for other operations.
 4) the driver runs in a separate container. I think the same executor can
 be used for receiver and the processing task also (this part am not very
 sure)


  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi all,

 I am working to improve the parallelism of the Spark Streaming
 application. But I have problem in understanding how the executors are used
 and the application is distributed.

 1. In YARN, is one executor equal one container?

 2. I saw the statement that a streaming receiver runs on one work machine
 (*n**ote that each input DStream creates a single receiver (running on
 a worker machine) that receives a single stream of data*). Does the
 work machine mean the executor or physical machine? If I have more
 receivers than the executors, will it still work?

 3. Is the executor that holds receiver also used for other operations,
 such as map, reduce, or fully occupied by the receiver? Similarly, if I run
 in yarn-cluster mode, is the executor running driver program used by other
 operations too?

 4. So if I have a driver program (cluster mode) and streaming receiver,
 do I have to have at least 2 executors because the program and streaming
 receiver have to be on different executors?

 Thank you. Sorry for having so many questions but I do want to understand
 how the Spark Streaming distributes in order to assign reasonable
 recourse.*_* Thank you again.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108





Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-11 Thread Yan Fang
Hi Tathagata,

Thank you. Is task slot equivalent to the core number? Or actually one core
can run multiple tasks at the same time?

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 The same executor can be used for both receiving and processing,
 irrespective of the deployment mode (yarn, spark standalone, etc.) It boils
 down to the number of cores / task slots that executor has. Each receiver
 is like a long running task, so each of them occupy a slot. If there are
 free slots in the executor then other tasks can be run on them.

 So if you are finding that the other tasks are being run, check how many
 cores/task slots the executor has and whether there are more task slots
 than the number of input dstream / receivers you are launching.

 @Praveen  your answers were pretty much spot on, thanks for chipping in!




 On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi Praveen,

 Thank you for the answer. That's interesting because if I only bring up
 one executor for the Spark Streaming, it seems only the receiver is
 working, no other tasks are happening, by checking the log and UI. Maybe
 it's just because the receiving task eats all the resource?, not because
 one executor can only run one receiver?

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com
 wrote:

 Here are my answers. But am just getting started with Spark Streaming -
 so please correct me if am wrong.
 1) Yes
 2) Receivers will run on executors. Its actually a job thats submitted
 where # of tasks equals # of receivers. An executor can actually run more
 than one task at the same time. Hence you could have more number of
 receivers than executors but its not recommended I think.
 3) As said in 2, the executor where receiver task is running can be used
 for map/reduce tasks. In yarn-cluster mode, the driver program is actually
 run as application master (lives in the first container thats launched) and
 this is not an executor - hence its not used for other operations.
 4) the driver runs in a separate container. I think the same executor
 can be used for receiver and the processing task also (this part am not
 very sure)


  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com
 wrote:

 Hi all,

 I am working to improve the parallelism of the Spark Streaming
 application. But I have problem in understanding how the executors are used
 and the application is distributed.

 1. In YARN, is one executor equal one container?

 2. I saw the statement that a streaming receiver runs on one work
 machine (*n**ote that each input DStream creates a single receiver
 (running on a worker machine) that receives a single stream of data*).
 Does the work machine mean the executor or physical machine? If I have
 more receivers than the executors, will it still work?

 3. Is the executor that holds receiver also used for other operations,
 such as map, reduce, or fully occupied by the receiver? Similarly, if I run
 in yarn-cluster mode, is the executor running driver program used by other
 operations too?

 4. So if I have a driver program (cluster mode) and streaming receiver,
 do I have to have at least 2 executors because the program and streaming
 receiver have to be on different executors?

 Thank you. Sorry for having so many questions but I do want to
 understand how the Spark Streaming distributes in order to assign
 reasonable recourse.*_* Thank you again.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108







How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-10 Thread Yan Fang
Hi all,

I am working to improve the parallelism of the Spark Streaming application.
But I have problem in understanding how the executors are used and the
application is distributed.

1. In YARN, is one executor equal one container?

2. I saw the statement that a streaming receiver runs on one work machine (
*n**ote that each input DStream creates a single receiver (running on a
worker machine) that receives a single stream of data*). Does the work
machine mean the executor or physical machine? If I have more receivers
than the executors, will it still work?

3. Is the executor that holds receiver also used for other operations, such
as map, reduce, or fully occupied by the receiver? Similarly, if I run in
yarn-cluster mode, is the executor running driver program used by other
operations too?

4. So if I have a driver program (cluster mode) and streaming receiver, do
I have to have at least 2 executors because the program and streaming
receiver have to be on different executors?

Thank you. Sorry for having so many questions but I do want to understand
how the Spark Streaming distributes in order to assign reasonable
recourse.*_* Thank you again.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


Spark Streaming - two questions about the streamingcontext

2014-07-09 Thread Yan Fang
I am using the Spark Streaming and have the following two questions:

1. If more than one output operations are put in the same StreamingContext
(basically, I mean, I put all the output operations in the same class), are
they processed one by one as the order they appear in the class? Or they
are actually processes parallely?

2. If one DStream takes longer than the interval time, does a new DStream
wait in the queue until the previous DStream is fully processed? Is there
any parallelism that may process the two DStream at the same time?

Thank you.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


Re: Spark Streaming - two questions about the streamingcontext

2014-07-09 Thread Yan Fang
Great. Thank you!

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Wed, Jul 9, 2014 at 11:45 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 1. Multiple output operations are processed in the order they are defined.
 That is because by default each one output operation is processed at a
 time. This *can* be parallelized using an undocumented config parameter
 spark.streaming.concurrentJobs which is by default set to 1.

 2. Yes, the output operations (and the spark jobs that are involved with
 them) gets queued up.

 TD


 On Wed, Jul 9, 2014 at 11:22 AM, Yan Fang yanfang...@gmail.com wrote:

 I am using the Spark Streaming and have the following two questions:

 1. If more than one output operations are put in the same
 StreamingContext (basically, I mean, I put all the output operations in the
 same class), are they processed one by one as the order they appear in the
 class? Or they are actually processes parallely?

 2. If one DStream takes longer than the interval time, does a new DStream
 wait in the queue until the previous DStream is fully processed? Is there
 any parallelism that may process the two DStream at the same time?

 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108





Spark Streaming - What does Spark Streaming checkpoint?

2014-07-09 Thread Yan Fang
Hi guys,

I am a little confusing by the checkpointing in Spark Streaming. It
checkpoints the intermediate data for the stateful operations for sure.
Does it also checkpoint the information of StreamingContext? Because it
seems we can recreate the SC from the checkpoint in a driver node failure
scenario. When I looked at the checkpoint directory, did not find much
clue. Any help? Thank you very much.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


Issues in opening UI when running Spark Streaming in YARN

2014-07-07 Thread Yan Fang
Hi guys,

Not sure if you  have similar issues. Did not find relevant tickets in
JIRA. When I deploy the Spark Streaming to YARN, I have following two
issues:

1. The UI port is random. It is not default 4040. I have to look at the
container's log to check the UI port. Is this suppose to be this way?

2. Most of the time, the UI does not work. The difference between logs are
(I ran the same program):






*14/07/03 11:38:50 INFO spark.HttpServer: Starting HTTP Server14/07/03
11:38:50 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/03 11:38:50 INFO
server.AbstractConnector: Started SocketConnector@0.0.0.0:12026
http://SocketConnector@0.0.0.0:1202614/07/03 11:38:51 INFO
executor.CoarseGrainedExecutorBackend: Got assigned task 0 14/07/03
11:38:51 INFO executor.Executor: Running task ID 0...*

14/07/02 16:55:32 INFO spark.HttpServer: Starting HTTP Server
14/07/02 16:55:32 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/02 16:55:32 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:14211




*14/07/02 16:55:32 INFO ui.JettyUtils: Adding filter:
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter14/07/02 16:55:32
INFO server.Server: jetty-8.y.z-SNAPSHOT14/07/02 16:55:32 INFO
server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:21867
http://SelectChannelConnector@0.0.0.0:21867 14/07/02 16:55:32 INFO
ui.SparkUI: Started SparkUI at http://myNodeName:21867
http://myNodeName:2186714/07/02 16:55:32 INFO
cluster.YarnClusterScheduler: Created YarnClusterScheduler*

When the red part comes, the UI works sometime. Any ideas? Thank you.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


Re: Issues in opening UI when running Spark Streaming in YARN

2014-07-07 Thread Yan Fang
Hi Andrew,

Thanks for the quick reply. It works with the yarn-client mode.

One question about the yarn-cluster mode: actually I was checking the AM
for the log, since the spark driver is running in the AM, the UI should
also work, right? But that is not true in my case.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Mon, Jul 7, 2014 at 11:42 AM, Andrew Or and...@databricks.com wrote:

 I will assume that you are running in yarn-cluster mode. Because the
 driver is launched in one of the containers, it doesn't make sense to
 expose port 4040 for the node that contains the container. (Imagine if
 multiple driver containers are launched on the same node. This will cause a
 port collision). If you're launching Spark from a gateway node that is
 physically near your worker nodes, then you can just launch your
 application in yarn-client mode, in which case the SparkUI will always be
 started on port 4040 on the node that you ran spark-submit on. The reason
 why sometimes you see the red text is because it appears only on the driver
 containers, not the executor containers. This is because SparkUI belongs to
 the SparkContext, which only exists on the driver.

 Andrew


 2014-07-07 11:20 GMT-07:00 Yan Fang yanfang...@gmail.com:

 Hi guys,

 Not sure if you  have similar issues. Did not find relevant tickets in
 JIRA. When I deploy the Spark Streaming to YARN, I have following two
 issues:

 1. The UI port is random. It is not default 4040. I have to look at the
 container's log to check the UI port. Is this suppose to be this way?

 2. Most of the time, the UI does not work. The difference between logs
 are (I ran the same program):






 *14/07/03 11:38:50 INFO spark.HttpServer: Starting HTTP Server14/07/03
 11:38:50 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/03 11:38:50 INFO
 server.AbstractConnector: Started SocketConnector@0.0.0.0:12026
 http://SocketConnector@0.0.0.0:1202614/07/03 11:38:51 INFO
 executor.CoarseGrainedExecutorBackend: Got assigned task 0 14/07/03
 11:38:51 INFO executor.Executor: Running task ID 0...*

 14/07/02 16:55:32 INFO spark.HttpServer: Starting HTTP Server
 14/07/02 16:55:32 INFO server.Server: jetty-8.y.z-SNAPSHOT
 14/07/02 16:55:32 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:14211




 *14/07/02 16:55:32 INFO ui.JettyUtils: Adding filter:
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter14/07/02 16:55:32
 INFO server.Server: jetty-8.y.z-SNAPSHOT14/07/02 16:55:32 INFO
 server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:21867
 http://SelectChannelConnector@0.0.0.0:21867 14/07/02 16:55:32 INFO
 ui.SparkUI: Started SparkUI at http://myNodeName:21867
 http://myNodeName:2186714/07/02 16:55:32 INFO
 cluster.YarnClusterScheduler: Created YarnClusterScheduler*

 When the red part comes, the UI works sometime. Any ideas? Thank you.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108





Re: Issues in opening UI when running Spark Streaming in YARN

2014-07-07 Thread Yan Fang
Thank you, Andrew. That makes sense for me now. I was confused by In
yarn-cluster mode, the Spark driver runs inside an application master
process which is managed by YARN on the cluster in
http://spark.apache.org/docs/latest/running-on-yarn.html . After
you explanation, it's clear now. Thank you.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Mon, Jul 7, 2014 at 1:07 PM, Andrew Or and...@databricks.com wrote:

 @Yan, the UI should still work. As long as you look into the container
 that launches the driver, you will find the SparkUI address and port. Note
 that in yarn-cluster mode the Spark driver doesn't actually run in the
 Application Manager; just like the executors, it runs in a container that
 is launched by the Resource Manager after the Application Master requests
 the container resources. In contrast, in yarn-client mode, your driver is
 not launched in a container, but in the client process that launched your
 application (i.e. spark-submit), so the stdout of this program directly
 contains the SparkUI messages.

 @Chester, I'm not sure what has gone wrong as there are many factors at
 play here. When you go the Resource Manager UI, does the application URL
 link point you to the same SparkUI address as indicated in the logs? If so,
 this is the correct behavior. However, I believe the redirect error has
 little to do with Spark itself, but more to do with how you set up the
 cluster. I have actually run into this myself, but I haven't found a
 workaround. Let me know if you find anything.




 2014-07-07 12:07 GMT-07:00 Chester Chen ches...@alpinenow.com:

 As Andrew explained, the port is random rather than 4040, as the the spark
 driver is started in Application Master and the port is random selected.


 But I have the similar UI issue. I am running Yarn Cluster mode against
 my local CDH5 cluster.

 The log states
 14/07/07 11:59:29 INFO ui.SparkUI: Started SparkUI at
 http://10.0.0.63:58750


 


 but when you client the spark UI link (ApplicationMaster or

 http://10.0.0.63:58750), I will got a 404 with the redirect URI




  http://localhost/proxy/application_1404443455764_0010/



 Looking at the Spark code, notice that the proxy is reallya variable to 
 get the proxy at the yarn-site.xml http address. But when I specified the 
 value at yarn-site.xml, it still doesn't work for me.



 Oddly enough, it works for my co-worker on Pivotal HD cluster, therefore I 
 am still looking what's the difference in terms of cluster setup or 
 something else.


 Chester





 On Mon, Jul 7, 2014 at 11:42 AM, Andrew Or and...@databricks.com wrote:

 I will assume that you are running in yarn-cluster mode. Because the
 driver is launched in one of the containers, it doesn't make sense to
 expose port 4040 for the node that contains the container. (Imagine if
 multiple driver containers are launched on the same node. This will cause a
 port collision). If you're launching Spark from a gateway node that is
 physically near your worker nodes, then you can just launch your
 application in yarn-client mode, in which case the SparkUI will always be
 started on port 4040 on the node that you ran spark-submit on. The reason
 why sometimes you see the red text is because it appears only on the driver
 containers, not the executor containers. This is because SparkUI belongs to
 the SparkContext, which only exists on the driver.

 Andrew


 2014-07-07 11:20 GMT-07:00 Yan Fang yanfang...@gmail.com:

 Hi guys,

 Not sure if you  have similar issues. Did not find relevant tickets in
 JIRA. When I deploy the Spark Streaming to YARN, I have following two
 issues:

 1. The UI port is random. It is not default 4040. I have to look at the
 container's log to check the UI port. Is this suppose to be this way?

 2. Most of the time, the UI does not work. The difference between logs
 are (I ran the same program):






 *14/07/03 11:38:50 INFO spark.HttpServer: Starting HTTP Server14/07/03
 11:38:50 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/03 11:38:50 INFO
 server.AbstractConnector: Started SocketConnector@0.0.0.0:12026
 http://SocketConnector@0.0.0.0:1202614/07/03 11:38:51 INFO
 executor.CoarseGrainedExecutorBackend: Got assigned task 0 14/07/03
 11:38:51 INFO executor.Executor: Running task ID 0...*

 14/07/02 16:55:32 INFO spark.HttpServer: Starting HTTP Server
 14/07/02 16:55:32 INFO server.Server: jetty-8.y.z-SNAPSHOT
 14/07/02 16:55:32 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:14211




 *14/07/02 16:55:32 INFO ui.JettyUtils: Adding filter:
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter14/07/02 16:55:32
 INFO server.Server: jetty-8.y.z-SNAPSHOT14/07/02 16:55:32 INFO
 server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:21867
 http://SelectChannelConnector@0.0.0.0:21867 14/07/02 16:55:32 INFO
 ui.SparkUI: Started SparkUI at http://myNodeName:21867
 http://myNodeName:2186714/07/02 16:55:32 INFO
 cluster.YarnClusterScheduler: Created

Is the order of messages guaranteed in a DStream?

2014-07-07 Thread Yan Fang
I know the order of processing DStream is guaranteed. Wondering if the
order of messages in one DStream is guaranteed. My gut feeling is yes for
the question because RDD is immutable. Some simple tests prove this. Want
to hear from authority to persuade myself. Thank you.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108