Re: Question about Spark Streaming Receiver Failure

2015-03-16 Thread Jun Yang
Dibyendu,

Thanks for the reply.

I am reading your project homepage now.

One quick question I care about is:

If the receivers failed for some reasons(for example, killed brutally by
someone else), is there any mechanism for the receiver to fail over
automatically?

On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Which version of Spark you are running ?

 You can try this Low Level Consumer :
 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

 This is designed to recover from various failures and have very good fault
 recovery mechanism built in. This is being used by many users and at
 present we at Pearson running this Receiver in Production for almost 3
 months without any issue.

 You can give this a try.

 Regards,
 Dibyendu

 On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You need to figure out why the receivers failed in the first place. Look
 in your worker logs and see what really happened. When you run a streaming
 job continuously for longer period mostly there'll be a lot of logs (you
 can enable log rotation etc.) and if you are doing a groupBy, join, etc
 type of operations, then there will be a lot of shuffle data. So You need
 to check in the worker logs and see what happened (whether DISK full etc.),
 We have streaming pipelines running for weeks without having any issues.

 Thanks
 Best Regards

 On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote:

 Guys,

 We have a project which builds upon Spark streaming.

 We use Kafka as the input stream, and create 5 receivers.

 When this application runs for around 90 hour, all the 5 receivers
 failed for some unknown reasons.

 In my understanding, it is not guaranteed that Spark streaming receiver
 will do fault recovery automatically.

 So I just want to figure out a way for doing fault-recovery to deal with
 receiver failure.

 There is a JIRA post mentioned using StreamingLister for monitoring the
 status of receiver:


 https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836

 However I haven't found any open doc about how to do this stuff.

 Any guys have met the same issue and deal with it?

 Our environment:
Spark 1.3.0
Dual Master Configuration
Kafka 0.8.2

 Thanks

 --
 yangjun...@gmail.com
 http://hi.baidu.com/yjpro






-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Question about Spark Streaming Receiver Failure

2015-03-16 Thread Jun Yang
Guys,

We have a project which builds upon Spark streaming.

We use Kafka as the input stream, and create 5 receivers.

When this application runs for around 90 hour, all the 5 receivers failed
for some unknown reasons.

In my understanding, it is not guaranteed that Spark streaming receiver
will do fault recovery automatically.

So I just want to figure out a way for doing fault-recovery to deal with
receiver failure.

There is a JIRA post mentioned using StreamingLister for monitoring the
status of receiver:

https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836

However I haven't found any open doc about how to do this stuff.

Any guys have met the same issue and deal with it?

Our environment:
   Spark 1.3.0
   Dual Master Configuration
   Kafka 0.8.2

Thanks

-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Re: Question about Spark Streaming Receiver Failure

2015-03-16 Thread Jun Yang
Akhil,

I have checked the logs. There isn't any clue as to why the 5 receivers
failed.

That's why I just take it for granted that it will be  a common issue for
receiver failures, and we need to figure out a way to detect this kind of
failure and do fail-over.

Thanks

On Mon, Mar 16, 2015 at 3:17 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You need to figure out why the receivers failed in the first place. Look
 in your worker logs and see what really happened. When you run a streaming
 job continuously for longer period mostly there'll be a lot of logs (you
 can enable log rotation etc.) and if you are doing a groupBy, join, etc
 type of operations, then there will be a lot of shuffle data. So You need
 to check in the worker logs and see what happened (whether DISK full etc.),
 We have streaming pipelines running for weeks without having any issues.

 Thanks
 Best Regards

 On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote:

 Guys,

 We have a project which builds upon Spark streaming.

 We use Kafka as the input stream, and create 5 receivers.

 When this application runs for around 90 hour, all the 5 receivers failed
 for some unknown reasons.

 In my understanding, it is not guaranteed that Spark streaming receiver
 will do fault recovery automatically.

 So I just want to figure out a way for doing fault-recovery to deal with
 receiver failure.

 There is a JIRA post mentioned using StreamingLister for monitoring the
 status of receiver:


 https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836

 However I haven't found any open doc about how to do this stuff.

 Any guys have met the same issue and deal with it?

 Our environment:
Spark 1.3.0
Dual Master Configuration
Kafka 0.8.2

 Thanks

 --
 yangjun...@gmail.com
 http://hi.baidu.com/yjpro





-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Re: Question about Spark Streaming Receiver Failure

2015-03-16 Thread Jun Yang
I have checked Dibyendu's code, it looks that his implementation has
auto-restart mechanism:


src/main/java/consumer/kafka/client/KafkaReceiver.java:

private void start() {

// Start the thread that receives data over a connection
KafkaConfig kafkaConfig = new KafkaConfig(_props);
ZkState zkState = new ZkState(kafkaConfig);
_kConsumer = new KafkaConsumer(kafkaConfig, zkState, this);
_kConsumer.open(_partitionId);

Thread.UncaughtExceptionHandler eh = new
Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread th, Throwable ex) {
  restart(Restarting Receiver for Partition  + _partitionId , ex,
5000);
}
};

_consumerThread = new Thread(_kConsumer);
_consumerThread.setDaemon(true);
_consumerThread.setUncaughtExceptionHandler(eh);
_consumerThread.start();
  }

I also checked Spark's native Kafka Receiver implementation, and it looks
not have any auto-restart support.

Any comments from Dibyendu?

On Mon, Mar 16, 2015 at 3:39 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 As i seen, once i kill my receiver on one machine, it will automatically
 spawn another receiver on another machine or on the same machine.

 Thanks
 Best Regards

 On Mon, Mar 16, 2015 at 1:08 PM, Jun Yang yangjun...@gmail.com wrote:

 Dibyendu,

 Thanks for the reply.

 I am reading your project homepage now.

 One quick question I care about is:

 If the receivers failed for some reasons(for example, killed brutally by
 someone else), is there any mechanism for the receiver to fail over
 automatically?

 On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Which version of Spark you are running ?

 You can try this Low Level Consumer :
 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

 This is designed to recover from various failures and have very good
 fault recovery mechanism built in. This is being used by many users and at
 present we at Pearson running this Receiver in Production for almost 3
 months without any issue.

 You can give this a try.

 Regards,
 Dibyendu

 On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You need to figure out why the receivers failed in the first place.
 Look in your worker logs and see what really happened. When you run a
 streaming job continuously for longer period mostly there'll be a lot of
 logs (you can enable log rotation etc.) and if you are doing a groupBy,
 join, etc type of operations, then there will be a lot of shuffle data. So
 You need to check in the worker logs and see what happened (whether DISK
 full etc.), We have streaming pipelines running for weeks without having
 any issues.

 Thanks
 Best Regards

 On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com
 wrote:

 Guys,

 We have a project which builds upon Spark streaming.

 We use Kafka as the input stream, and create 5 receivers.

 When this application runs for around 90 hour, all the 5 receivers
 failed for some unknown reasons.

 In my understanding, it is not guaranteed that Spark streaming
 receiver will do fault recovery automatically.

 So I just want to figure out a way for doing fault-recovery to deal
 with receiver failure.

 There is a JIRA post mentioned using StreamingLister for monitoring
 the status of receiver:


 https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836

 However I haven't found any open doc about how to do this stuff.

 Any guys have met the same issue and deal with it?

 Our environment:
Spark 1.3.0
Dual Master Configuration
Kafka 0.8.2

 Thanks

 --
 yangjun...@gmail.com
 http://hi.baidu.com/yjpro






 --
 yangjun...@gmail.com
 http://hi.baidu.com/yjpro





-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Is It Feasible for Spark 1.1 Broadcast to Fully Utilize the Ethernet Card Throughput?

2015-01-09 Thread Jun Yang
 Guys,

I have a question regarding to Spark 1.1 broadcast implementation.

In our pipeline, we have a large multi-class LR model, which is about 1GiB
size.
To employ the benefit of Spark parallelism, a natural thinking is to
broadcast this model file to the worker node.

However, it looks that broadcast performance is not quite good.

During the process of broadcasting the model file, I just monitor the
network card throughput of worker node, their
recv/write throughput is just around 30~40 MiB( our server box is equipped
with 100MiB ethernet card).

Is this the real limitation of Spark 1.1 broadcast implementation? Or there
may be some configuration or tricks
that can help make Spark broadcast perform better.

Thanks



-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Re: k-means clustering

2014-11-20 Thread Jun Yang
Guys,

As to the questions of pre-processing, you could just migrate your logic to
Spark before using K-means.

I only used Scala on Spark, and haven't used Python binding on Spark, but I
think the basic steps must be the same.

BTW, if your data set is big with huge sparse dimension feature vector,
K-Means may not works as good as you expected. And I think this is still
the optimization direction of Spark MLLib.

On Wed, Nov 19, 2014 at 2:21 PM, amin mohebbi aminn_...@yahoo.com.invalid
wrote:

 Hi there,

 I would like to do text clustering using  k-means and Spark on a massive
 dataset. As you know, before running the k-means, I have to do
 pre-processing methods such as TFIDF and NLTK on my big dataset. The
 following is my code in python :

 if __name__ == '__main__': # Cluster a bunch of text documents. import re
 import sys k = 6 vocab = {} xs = [] ns=[] cat=[] filename='2013-01.csv'
 with open(filename, newline='') as f: try: newsreader = csv.reader(f) for
 row in newsreader: ns.append(row[3]) cat.append(row[4]) except csv.Error
 as e: sys.exit('file %s, line %d: %s' % (filename, newsreader.line_num,
 e))  remove_spl_char_regex = re.compile('[%s]' %
 re.escape(string.punctuation)) # regex to remove special characters
 remove_num = re.compile('[\d]+') #nltk.download() stop_words=
 nltk.corpus.stopwords.words('english') for a in ns: x = defaultdict(float)
 a1 = a.strip().lower() a2 = remove_spl_char_regex.sub( ,a1) # Remove
 special characters a3 = remove_num.sub(, a2) #Remove numbers #Remove
 stop words words = a3.split() filter_stop_words = [w for w in words if not
 w in stop_words] stemed = [PorterStemmer().stem_word(w) for w in
 filter_stop_words] ws=sorted(stemed)  #ws=re.findall(r\w+, a1) for w in
 ws: vocab.setdefault(w, len(vocab)) x[vocab[w]] += 1 xs.append(x.items())

 Can anyone explain to me how can I do the pre-processing step, before
 running the k-means using spark.


 Best Regards

 ...

 Amin Mohebbi

 PhD candidate in Software Engineering
  at university of Malaysia

 Tel : +60 18 2040 017



 E-Mail : tp025...@ex.apiit.edu.my

   amin_...@me.com




-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Questions Regarding to MPI Program Migration to Spark

2014-11-16 Thread Jun Yang
Guys,

Recently we are migrating our backend pipeline from to Spark.

In our pipeline, we have a MPI-based HAC implementation, to ensure the
result consistency of migration, we also want to migrate this
MPI-implemented code to Spark.

However, during the migration process, I found that there are some possible
limitation with Spark.

In the original MPI implementation, the logic looks like the following:

Node 0( master node )

 Get the complete document data, store in g_doc_data
 Get the document sub-set for which this node needs to  calculate the
distance metrics, store in l_dist_metric_data
 while ( exit condition is not met ) {
Find the locally closed node pair, notated as l_closest_pair
Get the globally closed node pair from other nodes via MPI's
MPI_AllReduce, notated as g_closest_pair
Merge the globally closed node pair and update the document data
g_doc_data.
Re-calculate the distance metrics for those node pair which will be
impacted by the above merge operations, update l_dist_metric_data.
  }
Node 1/2/.../P ( slave nodes )
 Get the complete document data, store in g_doc_data
 Get the document sub-set for which this node needs to  calculate the
distance metrics, store in l_dist_metric_data
 while ( exit condition is not met ) {
Find the locally closed node pair, notated as l_closest_pair
Get the globally closed node pair from other nodes via MPI's
MPI_AllReduce, notated as g_closest_pair
Merge the globally closed node pair and update the document data
g_doc_data.
Re-calculate the distance metrics for those node pair which will be
impacted by the above merge operations, update l_dist_metric_data.
  }

The essential difficulty for migrating the above logic to Spark is:
In the original implementation, between each iteration, the computation
nodes need to hold the local state( which is g_doc_data and
l_dist_metric_data ).
And in Spark, it looks that there isn't any effective ways for keeping
intermediate local state between iterations. Usually in Spark, we use
either broadcast variable or closure to pass state to the operations of
each iterations.

Of course, after each iteration, we could summarize the change effects from
all the worker nodes via reduce and then broadcast this summarization
effect to them back again. But this operation will involve a significant
data transfer, when the data size is large ( e.g. 100 thousands documents
with 500 dimension feature vectors ),  and the performance penalty is
non-neglectable.

So my question is:
1. Is the difficulty I mentioned above is the limitations imposed by the
computation paradigm of Spark?
2. Is there any possible ways for implementing the bottom-up agglomeration
hierarchical clustering algorithms in Spark?

BTW, I know there are some top-down divisive hierarchical clustering
algorithm in the upcoming 1.2 release, I will also give them a try.

Thanks.
-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro