How could one specify a Docker image for each job to be used by executors?

2016-12-05 Thread Enno Shioji
Hi,

Suppose I have a job that uses some native libraries. I can launch
executors using a Docker container and everything is fine.

Now suppose I have some other job that uses some other native libraries
(and let's assume they just can't co-exist in the same docker image), but I
want to execute those on the same cluster.

Is there a way to have executors run in a specified environment (e.g.
docker container), on per-job basis? E.g. such that some executors for one
job are launched with some docker image and other executors belonging to
other jobs are using a different docker image?


/Enno


Re: Twitter live Streaming

2015-08-04 Thread Enno Shioji
If you want to do it through streaming API you have to pay Gnip; it's not free. 
You can go through non-streaming Twitter API and convert it to stream yourself 
though.



 On 4 Aug 2015, at 09:29, Sadaf sa...@platalytics.com wrote:
 
 Hi
 Is there any way to get all old tweets since when the account was created
 using spark streaming and twitters api? Currently my connector is showing
 those tweets that get posted after the program runs. I've done this task
 using spark streaming and a custom receiver using twitter user api.
 
 Thanks in anticipation.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-live-Streaming-tp24124.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 Q

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Twitter streaming with apache spark stream only a small amount of tweets

2015-07-29 Thread Enno Shioji
If you start parallel Twitter streams, you will be in breach of their TOS.
They allow a small number of parallel stream in practice, but if you do it
on massive scale they'll ban you (I'm speaking from experience ;) ).

If you really need that level of data, you need to talk to a company called
Gnip - AFAIK they are the sole reseller now. It's not cheap though.



On Wed, Jul 29, 2015 at 7:02 PM, Zoran Jeremic zoran.jere...@gmail.com
wrote:

 Actually, I posted that question :)
 I already implemented  solution that Akhil suggested there , and that
 solution is using Sample tweets API, which returns only 1% of the tweets.
 It would not work in my scenario of use. For the hashtags I'm interested
 in, I need to catch each single tweet, not only some of them.
 So for me, only twitter filtering API would work, but as I already wrote,
 there is another problem. Twitter  limits to maximum number of 400 hashtags
 you can use in the filter. That means I need several parallel twitter
 streams in order to follow more hashtags.
 That was the problem I could not solve with Spark twitter streaming. I
 could not start parallel streams. The other problem is that I need to add
 and remove hashtags from the running streams, that is, I need to clean up
 stream, and initialize filter again. I managed to implement this with
 twitter4j directly, but not with spark-twitter streaming.

 Zoran



 On Wed, Jul 29, 2015 at 10:40 AM, Peyman Mohajerian mohaj...@gmail.com
 wrote:

 'How to restart Twitter spark stream' i
 It may not be exactly what you are looking for, but i thought it did
 touch on some aspect of your question.

 On Wed, Jul 29, 2015 at 10:26 AM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Can you send me the subject of that email? I can't find any email
 suggesting solution to that problem. There is email *Twitter4j
 streaming question*, but it doesn't have any sample code. It just
 confirms what I explained earlier that without filtering Twitter will limit
 to 1% of tweets, and if you use filter API, Twitter limits you to 400
 hashtags you can follow.

 Thanks,
 Zoran

 On Wed, Jul 29, 2015 at 8:40 AM, Peyman Mohajerian mohaj...@gmail.com
 wrote:

 This question was answered with sample code a couple of days ago,
 please look back.

 On Sat, Jul 25, 2015 at 11:43 PM, Zoran Jeremic 
 zoran.jere...@gmail.com wrote:

 Hi,

 I discovered what is the problem here. Twitter public stream is
 limited to 1% of overall tweets (https://goo.gl/kDwnyS), so that's
 why I can't access all the tweets posted with specific hashtag using
 approach that I posted in previous email, so I guess this approach would
 not work for me. The other problem is that filtering has a limit of 400
 hashtags (https://goo.gl/BywrAk), so in order to follow more than 400
 hashtags I need more parallel streams.

 This brings me back to my previous question (https://goo.gl/bVDkHx).
 In my application I need to follow more than 400 hashtags, and I need to
 collect each tweet having one of these hashtags. Another complication is
 that users could add new hashtags or remove old hashtags, so I have to
 update stream in the real-time.
 My earlier approach without Apache Spark was to create twitter4j user
 stream with initial filter, and each time new hashtag has to be added, 
 stop
 stream, add new hashtag and run it again. When stream had 400 hashtags, I
 initialize new stream with new credentials. This was really complex, and I
 was hopping that Apache Spark would make it simpler. However, I'm trying
 for a days to find solution, and had no success.

 If I have to use the same approach I used with twitter4j, I have to
 solve 2 problems:
 - how to run multiple twitter streams in the same spark context
 - how to add new hashtags to the existing filter

 I hope that somebody will have some more elegant solution and idea,
 and tell me that I missed something obvious.

 Thanks,
 Zoran

 On Sat, Jul 25, 2015 at 8:44 PM, Zoran Jeremic 
 zoran.jere...@gmail.com wrote:

 Hi,

 I've implemented Twitter streaming as in the code given at the bottom
 of email. It finds some tweets based on the hashtags I'm following.
 However, it seems that a large amount of tweets is missing. I've tried to
 post some tweets that I'm following in the application, and none of them
 was received in application. I also checked some hashtags (e.g. #android)
 on Twitter using Live and I could see that almost each second something 
 was
 posted with that hashtag, and my application received only 3-4 posts in 
 one
 minute.

 I didn't have this problem in earlier non-spark version of
 application which used twitter4j to access user stream API. I guess this 
 is
 some trending stream, but I couldn't find anything that explains which
 Twitter API is used in Spark Twitter Streaming and how to create stream
 that will access everything posted on the Twitter.

 I hope somebody could explain what is the problem and how to solve
 this.

 Thanks,
 Zoran


  def initializeStreaming(){
val config 

Re: Twitter4J streaming question

2015-07-23 Thread Enno Shioji
You are probably listening to the sample stream, and THEN filtering. This
means you listen to 1% of the twitter stream, and then looking for the
tweet by Bloomberg, so there is a very good chance you don't see the
particular tweet.

In order to get all Bloomberg related tweets, you must connect to twitter
using the filter API and not the sample API:
https://dev.twitter.com/streaming/reference/post/statuses/filter

On Thu, Jul 23, 2015 at 8:23 PM, pjmccarthy pmccar...@eatonvance.com
wrote:

 Hopefully this is an easy one.  I am trying to filter a twitter dstream by
 user ScreenName - my code is as follows
 val stream = TwitterUtils.createStream(ssc, None)
 .filter(_.getUser.getScreenName.contains(markets))

 however nothing gets returned and I can see that Bloomberg has tweeted.  If
 I remove the filter I get tweets
 If I change the code to looke for engligh or french tweets that works

 Is there a better way to do it ?

 Can anyone assist ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Twitter4J-streaming-question-tp23974.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Twitter4J streaming question

2015-07-23 Thread Enno Shioji
You need to pay a lot of money to get the full stream, so unless you are
doing that, it's the sample stream!

On Thu, Jul 23, 2015 at 9:26 PM, Patrick McCarthy pmccar...@eatonvance.com
wrote:

  How can I tell if it's the sample stream or full stream ?
 Thanks

 Sent from my iPhone

 On Jul 23, 2015, at 4:17 PM, Enno Shioji eshi...@gmail.com wrote:

   You are probably listening to the sample stream, and THEN filtering.
 This means you listen to 1% of the twitter stream, and then looking for the
 tweet by Bloomberg, so there is a very good chance you don't see the
 particular tweet.

  In order to get all Bloomberg related tweets, you must connect to
 twitter using the filter API and not the sample API:
 https://dev.twitter.com/streaming/reference/post/statuses/filter

 On Thu, Jul 23, 2015 at 8:23 PM, pjmccarthy pmccar...@eatonvance.com
 wrote:

 Hopefully this is an easy one.  I am trying to filter a twitter dstream by
 user ScreenName - my code is as follows
 val stream = TwitterUtils.createStream(ssc, None)
 .filter(_.getUser.getScreenName.contains(markets))

 however nothing gets returned and I can see that Bloomberg has tweeted.
 If
 I remove the filter I get tweets
 If I change the code to looke for engligh or french tweets that works

 Is there a better way to do it ?

 Can anyone assist ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Twitter4J-streaming-question-tp23974.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: RE: Spark or Storm

2015-06-19 Thread Enno Shioji
Tbh I find the doc around this a bit confusing. If it says end-to-end
exactly-once semantics (if your updates to downstream systems are
idempotent or transactional), I think most people will interpret it that
as long as you use a storage which has atomicity (like MySQL/Postgres
etc.), a successful output operation for a given batch (let's say + 5) is
going to be issued exactly-once against the storage.

However, as I understand it that's not what this statement means. What it
is saying is, it will always issue +5 and never, say +6, because it
makes sure a message is processed exactly-once internally. However, it
*may* issue +5 more than once for a given batch, and it is up to the
developer to deal with this by either making the output operation
idempotent (e.g. set 5), or transactional (e.g. keep track of batch IDs
and skip already applied batches etc.).

I wonder if it makes more sense to drop or transactional from the
statement, because if you think about it, ultimately what you are asked to
do is to make the writes idempotent even with the transactional approach,
 transactional is a bit loaded and would be prone to lead to
misunderstandings (even though in fairness, if you read the fault tolerance
chapter it explicitly explains it).



On Fri, Jun 19, 2015 at 2:56 AM, prajod.vettiyat...@wipro.com wrote:

  More details on the Direct API of Spark 1.3 is at the databricks blog:
 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html



 Note the use of checkpoints to persist the Kafka offsets in Spark
 Streaming itself, and not in zookeeper.



 Also this statement:”.. This allows one to build a Spark Streaming +
 Kafka pipelines with end-to-end exactly-once semantics (if your updates to
 downstream systems are idempotent or transactional).”





 *From:* Cody Koeninger [mailto:c...@koeninger.org]
 *Sent:* 18 June 2015 19:38
 *To:* bit1...@163.com
 *Cc:* Prajod S Vettiyattil (WT01 - BAS); jrpi...@gmail.com;
 eshi...@gmail.com; wrbri...@gmail.com; asoni.le...@gmail.com; ayan guha;
 user; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in;
 sabarish.sasidha...@manthan.com
 *Subject:* Re: RE: Spark or Storm



 That general description is accurate, but not really a specific issue of
 the direct steam.  It applies to anything consuming from kafka (or, as
 Matei already said, any streaming system really).  You can't have exactly
 once semantics, unless you know something more about how you're storing
 results.



 For some unique id, topicpartition and offset is usually the obvious
 choice, which is why it's important that the direct stream gives you access
 to the offsets.



 See https://github.com/koeninger/kafka-exactly-once for more info







 On Thu, Jun 18, 2015 at 6:47 AM, bit1...@163.com bit1...@163.com wrote:

  I am wondering how direct stream api ensures end-to-end exactly once
 semantics



 I think there are two things involved:

 1. From the spark streaming end, the driver will replay the Offset range
 when it's down and restarted,which means that the new tasks will process
 some already processed data.

 2. From the user end, since tasks may process already processed data, user
 end should detect that some data has already been processed,eg,

 use some unique ID.



 Not sure if I have understood correctly.




  --

 bit1...@163.com



 *From:* prajod.vettiyat...@wipro.com

 *Date:* 2015-06-18 16:56

 *To:* jrpi...@gmail.com; eshi...@gmail.com

 *CC:* wrbri...@gmail.com; asoni.le...@gmail.com; guha.a...@gmail.com;
 user@spark.apache.org; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in;
 sabarish.sasidha...@manthan.com

 *Subject:* RE: Spark or Storm

 not being able to read from Kafka using multiple nodes



  Kafka is plenty capable of doing this..



 I faced the same issue before Spark 1.3 was released.



 The issue was not with Kafka, but with Spark Streaming’s Kafka connector.
 Before Spark 1.3.0 release one Spark worker would get all the streamed
 messages. We had to re-partition to distribute the processing.



 From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel
 reads from Kafka streamed to Spark workers. See the “Approach 2: Direct
 Approach” in this page:
 http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html. Note
 that is also mentions zero data loss and exactly once semantics for kafka
 integration.





 Prajod



 *From:* Jordan Pilat [mailto:jrpi...@gmail.com]
 *Sent:* 18 June 2015 03:57
 *To:* Enno Shioji
 *Cc:* Will Briggs; asoni.le...@gmail.com; ayan guha; user; Sateesh
 Kavuri; Spark Enthusiast; Sabarish Sasidharan
 *Subject:* Re: Spark or Storm



 not being able to read from Kafka using multiple nodes

 Kafka is plenty capable of doing this,  by clustering together multiple
 consumer instances into a consumer group.
 If your topic is sufficiently partitioned, the consumer group can consume
 the topic in a parallelized fashion.
 If it isn't, you still have

Re: RE: Spark or Storm

2015-06-19 Thread Enno Shioji
Fair enough, on second thought, just saying that it should be idempotent is
indeed more confusing.

I guess the crux of the confusion comes from the fact that people tend to
assume the work you described (store batch id and skip etc.) is handled by
the framework, perhaps partly because Storm Trident does handle it (you
just need to let Storm know if the output operation has succeeded or not,
and it handles the batch id storing  skipping business). Whenever I
explain people that one needs to do this additional work you described to
get end-to-end exactly-once semantics, it usually takes a while to convince
them. In my limited experience, they tend to interpret transactional in
that sentence to mean that you just have to write to a transactional
storage like ACID RDB. Pointing them to Semantics of output operations is
usually sufficient though.

Maybe others like @Ashish can weigh on this; did you interpret it in this
way?

What if we change the statement into:
end-to-end exactly-once semantics (if your updates to downstream systems
are idempotent or transactional). To learn how to make your updates
idempotent or transactional, see the Semantics of output operations
section in this chapter
https://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics


That way, it's clear that it's not sufficient to merely write to a
transactional storage like ACID store.







On Fri, Jun 19, 2015 at 9:08 AM, Tathagata Das t...@databricks.com wrote:

 If the current documentation is confusing, we can definitely improve the
 documentation. However, I dont not understand why is the term
 transactional confusing. If your output operation has to add 5, then the
 user has to implement the following mechanism

 1. If the unique id of the batch of data is already present in the store,
 then skip the update
 2. Otherwise atomically do both, the update operation as well as store the
 unique id of the batch. This is pretty much the definition of a
 transaction. The user has to be aware of the transactional semantics of the
 data store while implementing this functionality.

 You CAN argue that this effective makes the whole updating sort-a
 idempotent, as even if you try doing it multiple times, it will update only
 once. But that is not what is generally considered as idempotent. Writing a
 fixed count, not an increment, is usually what is called idempotent. And so
 just mentioning that the output operation must be idempotent is, in my
 opinion, more confusing.

 To take a page out of the Storm / Trident guide, even they call this exact
 conditional updating of Trident State as transactional operation. See
 transactional spout in the Trident State guide -
 https://storm.apache.org/documentation/Trident-state

 In the end, I am totally open the suggestions and PRs on how to make the
 programming guide easier to understand. :)

 TD

 On Thu, Jun 18, 2015 at 11:47 PM, Enno Shioji eshi...@gmail.com wrote:

 Tbh I find the doc around this a bit confusing. If it says end-to-end
 exactly-once semantics (if your updates to downstream systems are
 idempotent or transactional), I think most people will interpret it
 that as long as you use a storage which has atomicity (like MySQL/Postgres
 etc.), a successful output operation for a given batch (let's say + 5) is
 going to be issued exactly-once against the storage.

 However, as I understand it that's not what this statement means. What it
 is saying is, it will always issue +5 and never, say +6, because it
 makes sure a message is processed exactly-once internally. However, it
 *may* issue +5 more than once for a given batch, and it is up to the
 developer to deal with this by either making the output operation
 idempotent (e.g. set 5), or transactional (e.g. keep track of batch IDs
 and skip already applied batches etc.).

 I wonder if it makes more sense to drop or transactional from the
 statement, because if you think about it, ultimately what you are asked to
 do is to make the writes idempotent even with the transactional approach,
  transactional is a bit loaded and would be prone to lead to
 misunderstandings (even though in fairness, if you read the fault tolerance
 chapter it explicitly explains it).



 On Fri, Jun 19, 2015 at 2:56 AM, prajod.vettiyat...@wipro.com wrote:

  More details on the Direct API of Spark 1.3 is at the databricks blog:
 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html



 Note the use of checkpoints to persist the Kafka offsets in Spark
 Streaming itself, and not in zookeeper.



 Also this statement:”.. This allows one to build a Spark Streaming +
 Kafka pipelines with end-to-end exactly-once semantics (if your updates to
 downstream systems are idempotent or transactional).”





 *From:* Cody Koeninger [mailto:c...@koeninger.org]
 *Sent:* 18 June 2015 19:38
 *To:* bit1...@163.com
 *Cc:* Prajod S Vettiyattil (WT01 - BAS); jrpi...@gmail.com;
 eshi...@gmail.com; wrbri

Re: Spark or Storm

2015-06-17 Thread Enno Shioji
Hi Matei,


Ah, can't get more accurate than from the horse's mouth... If you don't
mind helping me understand it correctly..

From what I understand, Storm Trident does the following (when used with
Kafka):
1) Sit on Kafka Spout and create batches
2) Assign global sequential ID to the batches
3) Make sure that all result of processed batches are written once to
TridentState, *in order* (for example, by skipping batches that were
already applied once, ultimately by using Zookeeper)

TridentState is an interface that you have to implement, and the underlying
storage has to be transactional for this to work. The necessary skipping
etc. is handled by Storm.

In case of Spark Streaming, I understand that
1) There is no global ordering; e.g. an output operation for batch
consisting of offset [4,5,6] can be invoked before the operation for offset
[1,2,3]
2) If you wanted to achieve something similar to what TridentState does,
you'll have to do it yourself (for example using Zookeeper)

Is this a correct understanding?




On Wed, Jun 17, 2015 at 7:14 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 This documentation is only for writes to an external system, but all the
 counting you do within your streaming app (e.g. if you use
 reduceByKeyAndWindow to keep track of a running count) is exactly-once.
 When you write to a storage system, no matter which streaming framework you
 use, you'll have to make sure the writes are idempotent, because the
 storage system can't know whether you meant to write the same data again or
 not. But the place where Spark Streaming helps over Storm, etc is for
 tracking state within your computation. Without that facility, you'd not
 only have to make sure that writes are idempotent, but you'd have to make
 sure that updates to your own internal state (e.g. reduceByKeyAndWindow)
 are exactly-once too.

 Matei


 On Jun 17, 2015, at 8:26 AM, Enno Shioji eshi...@gmail.com wrote:

 The thing is, even with that improvement, you still have to make updates
 idempotent or transactional yourself. If you read
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics

 that refers to the latest version, it says:

 Semantics of output operations

 Output operations (like foreachRDD) have *at-least once* semantics, that
 is, the transformed data may get written to an external entity more than
 once in the event of a worker failure. While this is acceptable for saving
 to file systems using the saveAs***Files operations (as the file will
 simply get overwritten with the same data), additional effort may be
 necessary to achieve exactly-once semantics. There are two approaches.

-

*Idempotent updates*: Multiple attempts always write the same data.
For example, saveAs***Files always writes the same data to the
generated files.
-

*Transactional updates*: All updates are made transactionally so that
updates are made exactly once atomically. One way to do this would be the
following.
- Use the batch time (available in foreachRDD) and the partition index
   of the transformed RDD to create an identifier. This identifier uniquely
   identifies a blob data in the streaming application.
   - Update external system with this blob transactionally (that is,
   exactly once, atomically) using the identifier. That is, if the 
 identifier
   is not already committed, commit the partition data and the identifier
   atomically. Else if this was already committed, skip the update.


 So either you make the update idempotent, or you have to make it
 transactional yourself, and the suggested mechanism is very similar to what
 Storm does.




 On Wed, Jun 17, 2015 at 3:51 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 @Enno
 As per the latest version and documentation Spark Streaming does offer
 exactly once semantics using improved kafka integration , Not i have not
 tested yet.

 Any feedback will be helpful if anyone is tried the same.

 http://koeninger.github.io/kafka-exactly-once/#7


 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html



 On Wed, Jun 17, 2015 at 10:33 AM, Enno Shioji eshi...@gmail.com wrote:

 AFAIK KCL is *supposed* to provide fault tolerance and load balancing
 (plus additionally, elastic scaling unlike Storm), Kinesis providing the
 coordination. My understanding is that it's like a naked Storm worker
 process that can consequently only do map.

 I haven't really used it tho, so can't really comment how it compares to
 Spark/Storm. Maybe somebody else will be able to comment.



 On Wed, Jun 17, 2015 at 3:13 PM, ayan guha guha.a...@gmail.com wrote:

 Thanks for this. It's kcl based kinesis application. But because its
 just a Java application we are thinking to use spark on EMR or storm for
 fault tolerance and load balancing. Is it a correct approach?
 On 17 Jun 2015 23:07, Enno Shioji eshi...@gmail.com wrote:

 Hi Ayan,

 Admittedly I

Re: Spark or Storm

2015-06-17 Thread Enno Shioji
We've evaluated Spark Streaming vs. Storm and ended up sticking with Storm.

Some of the important draw backs are:
Spark has no back pressure (receiver rate limit can alleviate this to a
certain point, but it's far from ideal)
There is also no exactly-once semantics. (updateStateByKey can achieve this
semantics, but is not practical if you have any significant amount of state
because it does so by dumping the entire state on every checkpointing)

There are also some minor drawbacks that I'm sure will be fixed quickly,
like no task timeout, not being able to read from Kafka using multiple
nodes, data loss hazard with Kafka.

It's also not possible to attain very low latency in Spark, if that's what
you need.

The pos for Spark is the concise and IMO more intuitive syntax, especially
if you compare it with Storm's Java API.

I admit I might be a bit biased towards Storm tho as I'm more familiar with
it.

Also, you can do some processing with Kinesis. If all you need to do is
straight forward transformation and you are reading from Kinesis to begin
with, it might be an easier option to just do the transformation in Kinesis.





On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com wrote:

 Whatever you write in bolts would be the logic you want to apply on your
 events. In Spark, that logic would be coded in map() or similar such
 transformations and/or actions. Spark doesn't enforce a structure for
 capturing your processing logic like Storm does.

 Regards
 Sab
 Probably overloading the question a bit.

 In Storm, Bolts have the functionality of getting triggered on events. Is
 that kind of functionality possible with Spark streaming? During each phase
 of the data processing, the transformed data is stored to the database and
 this transformed data should then be sent to a new pipeline for further
 processing

 How can this be achieved using Spark?



 On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast 
 sparkenthusi...@yahoo.in wrote:

 I have a use-case where a stream of Incoming events have to be aggregated
 and joined to create Complex events. The aggregation will have to happen at
 an interval of 1 minute (or less).

 The pipeline is :
   send events
  enrich event
 Upstream services --- KAFKA - event Stream
 Processor  Complex Event Processor  Elastic
 Search.

 From what I understand, Storm will make a very good ESP and Spark
 Streaming will make a good CEP.

 But, we are also evaluating Storm with Trident.

 How does Spark Streaming compare with Storm with Trident?

 Sridhar Chellappa







   On Wednesday, 17 June 2015 10:02 AM, ayan guha guha.a...@gmail.com
 wrote:


 I have a similar scenario where we need to bring data from kinesis to
 hbase. Data volecity is 20k per 10 mins. Little manipulation of data will
 be required but that's regardless of the tool so we will be writing that
 piece in Java pojo.
 All env is on aws. Hbase is on a long running EMR and kinesis on a
 separate cluster.
 TIA.
 Best
 Ayan
 On 17 Jun 2015 12:13, Will Briggs wrbri...@gmail.com wrote:

 The programming models for the two frameworks are conceptually rather
 different; I haven't worked with Storm for quite some time, but based on my
 old experience with it, I would equate Spark Streaming more with Storm's
 Trident API, rather than with the raw Bolt API. Even then, there are
 significant differences, but it's a bit closer.

 If you can share your use case, we might be able to provide better
 guidance.

 Regards,
 Will

 On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote:

 Hi All,

 I am evaluating spark VS storm ( spark streaming  ) and i am not able to
 see what is equivalent of Bolt in storm inside spark.

 Any help will be appreciated on this ?

 Thanks ,
 Ashish
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Spark or Storm

2015-06-17 Thread Enno Shioji
I guess both. In terms of syntax, I was comparing it with Trident.

If you are joining, Spark Streaming actually does offer windowed join out
of the box. We couldn't use this though as our event stream can grow
out-of-sync, so we had to implement something on top of Storm. If your
event streams don't become out of sync, you may find the built-in join in
Spark Streaming useful. Storm also has a join keyword but its semantics are
different.


 Also, what do you mean by No Back Pressure ?

So when a topology is overloaded, Storm is designed so that it will stop
reading from the source. Spark on the other hand, will keep reading from
the source and spilling it internally. This maybe fine, in fairness, but it
does mean you have to worry about the persistent store usage in the
processing cluster, whereas with Storm you don't have to worry because the
messages just remain in the data store.

Spark came up with the idea of rate limiting, but I don't feel this is as
nice as back pressure because it's very difficult to tune it such that you
don't cap the cluster's processing power but yet so that it will prevent
the persistent storage to get used up.


On Wed, Jun 17, 2015 at 9:33 AM, Spark Enthusiast sparkenthusi...@yahoo.in
wrote:

 When you say Storm, did you mean Storm with Trident or Storm?

 My use case does not have simple transformation. There are complex events
 that need to be generated by joining the incoming event stream.

 Also, what do you mean by No Back PRessure ?





   On Wednesday, 17 June 2015 11:57 AM, Enno Shioji eshi...@gmail.com
 wrote:


 We've evaluated Spark Streaming vs. Storm and ended up sticking with Storm.

 Some of the important draw backs are:
 Spark has no back pressure (receiver rate limit can alleviate this to a
 certain point, but it's far from ideal)
 There is also no exactly-once semantics. (updateStateByKey can achieve
 this semantics, but is not practical if you have any significant amount of
 state because it does so by dumping the entire state on every checkpointing)

 There are also some minor drawbacks that I'm sure will be fixed quickly,
 like no task timeout, not being able to read from Kafka using multiple
 nodes, data loss hazard with Kafka.

 It's also not possible to attain very low latency in Spark, if that's what
 you need.

 The pos for Spark is the concise and IMO more intuitive syntax, especially
 if you compare it with Storm's Java API.

 I admit I might be a bit biased towards Storm tho as I'm more familiar
 with it.

 Also, you can do some processing with Kinesis. If all you need to do is
 straight forward transformation and you are reading from Kinesis to begin
 with, it might be an easier option to just do the transformation in Kinesis.





 On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Whatever you write in bolts would be the logic you want to apply on your
 events. In Spark, that logic would be coded in map() or similar such
 transformations and/or actions. Spark doesn't enforce a structure for
 capturing your processing logic like Storm does.
 Regards
 Sab
 Probably overloading the question a bit.

 In Storm, Bolts have the functionality of getting triggered on events. Is
 that kind of functionality possible with Spark streaming? During each phase
 of the data processing, the transformed data is stored to the database and
 this transformed data should then be sent to a new pipeline for further
 processing

 How can this be achieved using Spark?



 On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast 
 sparkenthusi...@yahoo.in wrote:

 I have a use-case where a stream of Incoming events have to be aggregated
 and joined to create Complex events. The aggregation will have to happen at
 an interval of 1 minute (or less).

 The pipeline is :
   send events
  enrich event
 Upstream services --- KAFKA - event Stream
 Processor  Complex Event Processor  Elastic
 Search.

 From what I understand, Storm will make a very good ESP and Spark
 Streaming will make a good CEP.

 But, we are also evaluating Storm with Trident.

 How does Spark Streaming compare with Storm with Trident?

 Sridhar Chellappa







   On Wednesday, 17 June 2015 10:02 AM, ayan guha guha.a...@gmail.com
 wrote:


 I have a similar scenario where we need to bring data from kinesis to
 hbase. Data volecity is 20k per 10 mins. Little manipulation of data will
 be required but that's regardless of the tool so we will be writing that
 piece in Java pojo.
 All env is on aws. Hbase is on a long running EMR and kinesis on a
 separate cluster.
 TIA.
 Best
 Ayan
 On 17 Jun 2015 12:13, Will Briggs wrbri...@gmail.com wrote:

 The programming models for the two frameworks are conceptually rather
 different; I haven't worked with Storm for quite some time, but based on my
 old experience with it, I would equate Spark Streaming more with Storm's
 Trident API

Re: Spark or Storm

2015-06-17 Thread Enno Shioji
In that case I assume you need exactly once semantics. There's no
out-of-the-box way to do that in Spark. There is updateStateByKey, but it's
not practical with your use case as the state is too large (it'll try to
dump the entire intermediate state on every checkpoint, which would be
prohibitively expensive).

So either you have to implement something yourself, or you can use Storm
Trident (or transactional low-level API).

On Wed, Jun 17, 2015 at 1:26 PM, Ashish Soni asoni.le...@gmail.com wrote:

 My Use case is below

 We are going to receive lot of event as stream ( basically Kafka Stream )
 and then we need to process and compute

 Consider you have a phone contract with ATT and every call / sms / data
 useage you do is an event and then it needs  to calculate your bill on real
 time basis so when you login to your account you can see all those variable
 as how much you used and how much is left and what is your bill till date
 ,Also there are different rules which need to be considered when you
 calculate the total bill one simple rule will be 0-500 min it is free but
 above it is $1 a min.

 How do i maintain a shared state  ( total amount , total min , total data
 etc ) so that i know how much i accumulated at any given point as events
 for same phone can go to any node / executor.

 Can some one please tell me how can i achieve this is spark as in storm i
 can have a bolt which can do this ?

 Thanks,



 On Wed, Jun 17, 2015 at 4:52 AM, Enno Shioji eshi...@gmail.com wrote:

 I guess both. In terms of syntax, I was comparing it with Trident.

 If you are joining, Spark Streaming actually does offer windowed join out
 of the box. We couldn't use this though as our event stream can grow
 out-of-sync, so we had to implement something on top of Storm. If your
 event streams don't become out of sync, you may find the built-in join in
 Spark Streaming useful. Storm also has a join keyword but its semantics are
 different.


  Also, what do you mean by No Back Pressure ?

 So when a topology is overloaded, Storm is designed so that it will stop
 reading from the source. Spark on the other hand, will keep reading from
 the source and spilling it internally. This maybe fine, in fairness, but it
 does mean you have to worry about the persistent store usage in the
 processing cluster, whereas with Storm you don't have to worry because the
 messages just remain in the data store.

 Spark came up with the idea of rate limiting, but I don't feel this is as
 nice as back pressure because it's very difficult to tune it such that you
 don't cap the cluster's processing power but yet so that it will prevent
 the persistent storage to get used up.


 On Wed, Jun 17, 2015 at 9:33 AM, Spark Enthusiast 
 sparkenthusi...@yahoo.in wrote:

 When you say Storm, did you mean Storm with Trident or Storm?

 My use case does not have simple transformation. There are complex
 events that need to be generated by joining the incoming event stream.

 Also, what do you mean by No Back PRessure ?





   On Wednesday, 17 June 2015 11:57 AM, Enno Shioji eshi...@gmail.com
 wrote:


 We've evaluated Spark Streaming vs. Storm and ended up sticking with
 Storm.

 Some of the important draw backs are:
 Spark has no back pressure (receiver rate limit can alleviate this to a
 certain point, but it's far from ideal)
 There is also no exactly-once semantics. (updateStateByKey can achieve
 this semantics, but is not practical if you have any significant amount of
 state because it does so by dumping the entire state on every checkpointing)

 There are also some minor drawbacks that I'm sure will be fixed quickly,
 like no task timeout, not being able to read from Kafka using multiple
 nodes, data loss hazard with Kafka.

 It's also not possible to attain very low latency in Spark, if that's
 what you need.

 The pos for Spark is the concise and IMO more intuitive syntax,
 especially if you compare it with Storm's Java API.

 I admit I might be a bit biased towards Storm tho as I'm more familiar
 with it.

 Also, you can do some processing with Kinesis. If all you need to do is
 straight forward transformation and you are reading from Kinesis to begin
 with, it might be an easier option to just do the transformation in Kinesis.





 On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Whatever you write in bolts would be the logic you want to apply on your
 events. In Spark, that logic would be coded in map() or similar such
 transformations and/or actions. Spark doesn't enforce a structure for
 capturing your processing logic like Storm does.
 Regards
 Sab
 Probably overloading the question a bit.

 In Storm, Bolts have the functionality of getting triggered on events.
 Is that kind of functionality possible with Spark streaming? During each
 phase of the data processing, the transformed data is stored to the
 database and this transformed data should then be sent to a new pipeline
 for further

Re: Spark or Storm

2015-06-17 Thread Enno Shioji
Hi Ayan,

Admittedly I haven't done much with Kinesis, but if I'm not mistaken you
should be able to use their processor interface for that. In this
example, it's incrementing a counter:
https://github.com/awslabs/amazon-kinesis-data-visualization-sample/blob/master/src/main/java/com/amazonaws/services/kinesis/samples/datavis/kcl/CountingRecordProcessor.java

Instead of incrementing a counter, you could do your transformation and
send it to HBase.






On Wed, Jun 17, 2015 at 1:40 PM, ayan guha guha.a...@gmail.com wrote:

 Great discussion!!

 One qs about some comment: Also, you can do some processing with Kinesis.
 If all you need to do is straight forward transformation and you are
 reading from Kinesis to begin with, it might be an easier option to just do
 the transformation in Kinesis

 - Do you mean KCL application? Or some kind of processing withinKineis?

 Can you kindly share a link? I would definitely pursue this route as our
 transformations are really simple.

 Best

 On Wed, Jun 17, 2015 at 10:26 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 My Use case is below

 We are going to receive lot of event as stream ( basically Kafka Stream )
 and then we need to process and compute

 Consider you have a phone contract with ATT and every call / sms / data
 useage you do is an event and then it needs  to calculate your bill on real
 time basis so when you login to your account you can see all those variable
 as how much you used and how much is left and what is your bill till date
 ,Also there are different rules which need to be considered when you
 calculate the total bill one simple rule will be 0-500 min it is free but
 above it is $1 a min.

 How do i maintain a shared state  ( total amount , total min , total data
 etc ) so that i know how much i accumulated at any given point as events
 for same phone can go to any node / executor.

 Can some one please tell me how can i achieve this is spark as in storm i
 can have a bolt which can do this ?

 Thanks,



 On Wed, Jun 17, 2015 at 4:52 AM, Enno Shioji eshi...@gmail.com wrote:

 I guess both. In terms of syntax, I was comparing it with Trident.

 If you are joining, Spark Streaming actually does offer windowed join
 out of the box. We couldn't use this though as our event stream can grow
 out-of-sync, so we had to implement something on top of Storm. If your
 event streams don't become out of sync, you may find the built-in join in
 Spark Streaming useful. Storm also has a join keyword but its semantics are
 different.


  Also, what do you mean by No Back Pressure ?

 So when a topology is overloaded, Storm is designed so that it will stop
 reading from the source. Spark on the other hand, will keep reading from
 the source and spilling it internally. This maybe fine, in fairness, but it
 does mean you have to worry about the persistent store usage in the
 processing cluster, whereas with Storm you don't have to worry because the
 messages just remain in the data store.

 Spark came up with the idea of rate limiting, but I don't feel this is
 as nice as back pressure because it's very difficult to tune it such that
 you don't cap the cluster's processing power but yet so that it will
 prevent the persistent storage to get used up.


 On Wed, Jun 17, 2015 at 9:33 AM, Spark Enthusiast 
 sparkenthusi...@yahoo.in wrote:

 When you say Storm, did you mean Storm with Trident or Storm?

 My use case does not have simple transformation. There are complex
 events that need to be generated by joining the incoming event stream.

 Also, what do you mean by No Back PRessure ?





   On Wednesday, 17 June 2015 11:57 AM, Enno Shioji eshi...@gmail.com
 wrote:


 We've evaluated Spark Streaming vs. Storm and ended up sticking with
 Storm.

 Some of the important draw backs are:
 Spark has no back pressure (receiver rate limit can alleviate this to a
 certain point, but it's far from ideal)
 There is also no exactly-once semantics. (updateStateByKey can achieve
 this semantics, but is not practical if you have any significant amount of
 state because it does so by dumping the entire state on every 
 checkpointing)

 There are also some minor drawbacks that I'm sure will be fixed
 quickly, like no task timeout, not being able to read from Kafka using
 multiple nodes, data loss hazard with Kafka.

 It's also not possible to attain very low latency in Spark, if that's
 what you need.

 The pos for Spark is the concise and IMO more intuitive syntax,
 especially if you compare it with Storm's Java API.

 I admit I might be a bit biased towards Storm tho as I'm more familiar
 with it.

 Also, you can do some processing with Kinesis. If all you need to do is
 straight forward transformation and you are reading from Kinesis to begin
 with, it might be an easier option to just do the transformation in 
 Kinesis.





 On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Whatever you write in bolts would be the logic you

Re: Spark or Storm

2015-06-17 Thread Enno Shioji
PS just to elaborate on my first sentence, the reason Spark (not streaming)
can offer exactly once semantics is because its update operation is
idempotent. This is easy to do in a batch context because the input is
finite, but it's harder in streaming context.

On Wed, Jun 17, 2015 at 2:00 PM, Enno Shioji eshi...@gmail.com wrote:

 So Spark (not streaming) does offer exactly once. Spark Streaming however,
 can only do exactly once semantics *if the update operation is idempotent*.
 updateStateByKey's update operation is idempotent, because it completely
 replaces the previous state.

 So as long as you use Spark streaming, you must somehow make the update
 operation idempotent. Replacing the entire state is the easiest way to do
 it, but it's obviously expensive.

 The alternative is to do something similar to what Storm does. At that
 point, you'll have to ask though if just using Storm is easier than that.





 On Wed, Jun 17, 2015 at 1:50 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 As per my Best Understanding Spark Streaming offer Exactly once
 processing , is this achieve only through updateStateByKey or there is
 another way to do the same.

 Ashish

 On Wed, Jun 17, 2015 at 8:48 AM, Enno Shioji eshi...@gmail.com wrote:

 In that case I assume you need exactly once semantics. There's no
 out-of-the-box way to do that in Spark. There is updateStateByKey, but it's
 not practical with your use case as the state is too large (it'll try to
 dump the entire intermediate state on every checkpoint, which would be
 prohibitively expensive).

 So either you have to implement something yourself, or you can use Storm
 Trident (or transactional low-level API).

 On Wed, Jun 17, 2015 at 1:26 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 My Use case is below

 We are going to receive lot of event as stream ( basically Kafka Stream
 ) and then we need to process and compute

 Consider you have a phone contract with ATT and every call / sms / data
 useage you do is an event and then it needs  to calculate your bill on real
 time basis so when you login to your account you can see all those variable
 as how much you used and how much is left and what is your bill till date
 ,Also there are different rules which need to be considered when you
 calculate the total bill one simple rule will be 0-500 min it is free but
 above it is $1 a min.

 How do i maintain a shared state  ( total amount , total min , total
 data etc ) so that i know how much i accumulated at any given point as
 events for same phone can go to any node / executor.

 Can some one please tell me how can i achieve this is spark as in storm
 i can have a bolt which can do this ?

 Thanks,



 On Wed, Jun 17, 2015 at 4:52 AM, Enno Shioji eshi...@gmail.com wrote:

 I guess both. In terms of syntax, I was comparing it with Trident.

 If you are joining, Spark Streaming actually does offer windowed join
 out of the box. We couldn't use this though as our event stream can grow
 out-of-sync, so we had to implement something on top of Storm. If your
 event streams don't become out of sync, you may find the built-in join in
 Spark Streaming useful. Storm also has a join keyword but its semantics 
 are
 different.


  Also, what do you mean by No Back Pressure ?

 So when a topology is overloaded, Storm is designed so that it will
 stop reading from the source. Spark on the other hand, will keep reading
 from the source and spilling it internally. This maybe fine, in fairness,
 but it does mean you have to worry about the persistent store usage in the
 processing cluster, whereas with Storm you don't have to worry because the
 messages just remain in the data store.

 Spark came up with the idea of rate limiting, but I don't feel this is
 as nice as back pressure because it's very difficult to tune it such that
 you don't cap the cluster's processing power but yet so that it will
 prevent the persistent storage to get used up.


 On Wed, Jun 17, 2015 at 9:33 AM, Spark Enthusiast 
 sparkenthusi...@yahoo.in wrote:

 When you say Storm, did you mean Storm with Trident or Storm?

 My use case does not have simple transformation. There are complex
 events that need to be generated by joining the incoming event stream.

 Also, what do you mean by No Back PRessure ?





   On Wednesday, 17 June 2015 11:57 AM, Enno Shioji eshi...@gmail.com
 wrote:


 We've evaluated Spark Streaming vs. Storm and ended up sticking with
 Storm.

 Some of the important draw backs are:
 Spark has no back pressure (receiver rate limit can alleviate this to
 a certain point, but it's far from ideal)
 There is also no exactly-once semantics. (updateStateByKey can
 achieve this semantics, but is not practical if you have any significant
 amount of state because it does so by dumping the entire state on every
 checkpointing)

 There are also some minor drawbacks that I'm sure will be fixed
 quickly, like no task timeout, not being able to read from Kafka using
 multiple nodes, data loss

Re: Spark or Storm

2015-06-17 Thread Enno Shioji
Processing stuff in batch is not the same thing as being transactional. If
you look at Storm, it will e.g. skip tuples that were already applied to a
state to avoid counting stuff twice etc. Spark doesn't come with such
facility, so you could end up counting twice etc.



On Wed, Jun 17, 2015 at 2:09 PM, Ashish Soni asoni.le...@gmail.com wrote:

 Stream can also be processed in micro-batch / batches which is the main
 reason behind Spark Steaming so what is the difference ?

 Ashish

 On Wed, Jun 17, 2015 at 9:04 AM, Enno Shioji eshi...@gmail.com wrote:

 PS just to elaborate on my first sentence, the reason Spark (not
 streaming) can offer exactly once semantics is because its update operation
 is idempotent. This is easy to do in a batch context because the input is
 finite, but it's harder in streaming context.

 On Wed, Jun 17, 2015 at 2:00 PM, Enno Shioji eshi...@gmail.com wrote:

 So Spark (not streaming) does offer exactly once. Spark Streaming
 however, can only do exactly once semantics *if the update operation is
 idempotent*. updateStateByKey's update operation is idempotent, because
 it completely replaces the previous state.

 So as long as you use Spark streaming, you must somehow make the update
 operation idempotent. Replacing the entire state is the easiest way to do
 it, but it's obviously expensive.

 The alternative is to do something similar to what Storm does. At that
 point, you'll have to ask though if just using Storm is easier than that.





 On Wed, Jun 17, 2015 at 1:50 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 As per my Best Understanding Spark Streaming offer Exactly once
 processing , is this achieve only through updateStateByKey or there is
 another way to do the same.

 Ashish

 On Wed, Jun 17, 2015 at 8:48 AM, Enno Shioji eshi...@gmail.com wrote:

 In that case I assume you need exactly once semantics. There's no
 out-of-the-box way to do that in Spark. There is updateStateByKey, but 
 it's
 not practical with your use case as the state is too large (it'll try to
 dump the entire intermediate state on every checkpoint, which would be
 prohibitively expensive).

 So either you have to implement something yourself, or you can use
 Storm Trident (or transactional low-level API).

 On Wed, Jun 17, 2015 at 1:26 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 My Use case is below

 We are going to receive lot of event as stream ( basically Kafka
 Stream ) and then we need to process and compute

 Consider you have a phone contract with ATT and every call / sms /
 data useage you do is an event and then it needs  to calculate your bill 
 on
 real time basis so when you login to your account you can see all those
 variable as how much you used and how much is left and what is your bill
 till date ,Also there are different rules which need to be considered 
 when
 you calculate the total bill one simple rule will be 0-500 min it is free
 but above it is $1 a min.

 How do i maintain a shared state  ( total amount , total min , total
 data etc ) so that i know how much i accumulated at any given point as
 events for same phone can go to any node / executor.

 Can some one please tell me how can i achieve this is spark as in
 storm i can have a bolt which can do this ?

 Thanks,



 On Wed, Jun 17, 2015 at 4:52 AM, Enno Shioji eshi...@gmail.com
 wrote:

 I guess both. In terms of syntax, I was comparing it with Trident.

 If you are joining, Spark Streaming actually does offer windowed
 join out of the box. We couldn't use this though as our event stream can
 grow out-of-sync, so we had to implement something on top of Storm. If
 your event streams don't become out of sync, you may find the built-in 
 join
 in Spark Streaming useful. Storm also has a join keyword but its 
 semantics
 are different.


  Also, what do you mean by No Back Pressure ?

 So when a topology is overloaded, Storm is designed so that it will
 stop reading from the source. Spark on the other hand, will keep reading
 from the source and spilling it internally. This maybe fine, in 
 fairness,
 but it does mean you have to worry about the persistent store usage in 
 the
 processing cluster, whereas with Storm you don't have to worry because 
 the
 messages just remain in the data store.

 Spark came up with the idea of rate limiting, but I don't feel this
 is as nice as back pressure because it's very difficult to tune it such
 that you don't cap the cluster's processing power but yet so that it 
 will
 prevent the persistent storage to get used up.


 On Wed, Jun 17, 2015 at 9:33 AM, Spark Enthusiast 
 sparkenthusi...@yahoo.in wrote:

 When you say Storm, did you mean Storm with Trident or Storm?

 My use case does not have simple transformation. There are complex
 events that need to be generated by joining the incoming event stream.

 Also, what do you mean by No Back PRessure ?





   On Wednesday, 17 June 2015 11:57 AM, Enno Shioji 
 eshi...@gmail.com wrote:


 We've evaluated Spark Streaming vs

Re: Spark or Storm

2015-06-17 Thread Enno Shioji
So Spark (not streaming) does offer exactly once. Spark Streaming however,
can only do exactly once semantics *if the update operation is idempotent*.
updateStateByKey's update operation is idempotent, because it completely
replaces the previous state.

So as long as you use Spark streaming, you must somehow make the update
operation idempotent. Replacing the entire state is the easiest way to do
it, but it's obviously expensive.

The alternative is to do something similar to what Storm does. At that
point, you'll have to ask though if just using Storm is easier than that.





On Wed, Jun 17, 2015 at 1:50 PM, Ashish Soni asoni.le...@gmail.com wrote:

 As per my Best Understanding Spark Streaming offer Exactly once processing
 , is this achieve only through updateStateByKey or there is another way to
 do the same.

 Ashish

 On Wed, Jun 17, 2015 at 8:48 AM, Enno Shioji eshi...@gmail.com wrote:

 In that case I assume you need exactly once semantics. There's no
 out-of-the-box way to do that in Spark. There is updateStateByKey, but it's
 not practical with your use case as the state is too large (it'll try to
 dump the entire intermediate state on every checkpoint, which would be
 prohibitively expensive).

 So either you have to implement something yourself, or you can use Storm
 Trident (or transactional low-level API).

 On Wed, Jun 17, 2015 at 1:26 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 My Use case is below

 We are going to receive lot of event as stream ( basically Kafka Stream
 ) and then we need to process and compute

 Consider you have a phone contract with ATT and every call / sms / data
 useage you do is an event and then it needs  to calculate your bill on real
 time basis so when you login to your account you can see all those variable
 as how much you used and how much is left and what is your bill till date
 ,Also there are different rules which need to be considered when you
 calculate the total bill one simple rule will be 0-500 min it is free but
 above it is $1 a min.

 How do i maintain a shared state  ( total amount , total min , total
 data etc ) so that i know how much i accumulated at any given point as
 events for same phone can go to any node / executor.

 Can some one please tell me how can i achieve this is spark as in storm
 i can have a bolt which can do this ?

 Thanks,



 On Wed, Jun 17, 2015 at 4:52 AM, Enno Shioji eshi...@gmail.com wrote:

 I guess both. In terms of syntax, I was comparing it with Trident.

 If you are joining, Spark Streaming actually does offer windowed join
 out of the box. We couldn't use this though as our event stream can grow
 out-of-sync, so we had to implement something on top of Storm. If your
 event streams don't become out of sync, you may find the built-in join in
 Spark Streaming useful. Storm also has a join keyword but its semantics are
 different.


  Also, what do you mean by No Back Pressure ?

 So when a topology is overloaded, Storm is designed so that it will
 stop reading from the source. Spark on the other hand, will keep reading
 from the source and spilling it internally. This maybe fine, in fairness,
 but it does mean you have to worry about the persistent store usage in the
 processing cluster, whereas with Storm you don't have to worry because the
 messages just remain in the data store.

 Spark came up with the idea of rate limiting, but I don't feel this is
 as nice as back pressure because it's very difficult to tune it such that
 you don't cap the cluster's processing power but yet so that it will
 prevent the persistent storage to get used up.


 On Wed, Jun 17, 2015 at 9:33 AM, Spark Enthusiast 
 sparkenthusi...@yahoo.in wrote:

 When you say Storm, did you mean Storm with Trident or Storm?

 My use case does not have simple transformation. There are complex
 events that need to be generated by joining the incoming event stream.

 Also, what do you mean by No Back PRessure ?





   On Wednesday, 17 June 2015 11:57 AM, Enno Shioji eshi...@gmail.com
 wrote:


 We've evaluated Spark Streaming vs. Storm and ended up sticking with
 Storm.

 Some of the important draw backs are:
 Spark has no back pressure (receiver rate limit can alleviate this to
 a certain point, but it's far from ideal)
 There is also no exactly-once semantics. (updateStateByKey can
 achieve this semantics, but is not practical if you have any significant
 amount of state because it does so by dumping the entire state on every
 checkpointing)

 There are also some minor drawbacks that I'm sure will be fixed
 quickly, like no task timeout, not being able to read from Kafka using
 multiple nodes, data loss hazard with Kafka.

 It's also not possible to attain very low latency in Spark, if that's
 what you need.

 The pos for Spark is the concise and IMO more intuitive syntax,
 especially if you compare it with Storm's Java API.

 I admit I might be a bit biased towards Storm tho as I'm more familiar
 with it.

 Also, you can do some processing

Re: Spark or Storm

2015-06-17 Thread Enno Shioji
AFAIK KCL is *supposed* to provide fault tolerance and load balancing (plus
additionally, elastic scaling unlike Storm), Kinesis providing the
coordination. My understanding is that it's like a naked Storm worker
process that can consequently only do map.

I haven't really used it tho, so can't really comment how it compares to
Spark/Storm. Maybe somebody else will be able to comment.



On Wed, Jun 17, 2015 at 3:13 PM, ayan guha guha.a...@gmail.com wrote:

 Thanks for this. It's kcl based kinesis application. But because its just
 a Java application we are thinking to use spark on EMR or storm for fault
 tolerance and load balancing. Is it a correct approach?
 On 17 Jun 2015 23:07, Enno Shioji eshi...@gmail.com wrote:

 Hi Ayan,

 Admittedly I haven't done much with Kinesis, but if I'm not mistaken you
 should be able to use their processor interface for that. In this
 example, it's incrementing a counter:
 https://github.com/awslabs/amazon-kinesis-data-visualization-sample/blob/master/src/main/java/com/amazonaws/services/kinesis/samples/datavis/kcl/CountingRecordProcessor.java

 Instead of incrementing a counter, you could do your transformation and
 send it to HBase.






 On Wed, Jun 17, 2015 at 1:40 PM, ayan guha guha.a...@gmail.com wrote:

 Great discussion!!

 One qs about some comment: Also, you can do some processing with
 Kinesis. If all you need to do is straight forward transformation and you
 are reading from Kinesis to begin with, it might be an easier option to
 just do the transformation in Kinesis

 - Do you mean KCL application? Or some kind of processing withinKineis?

 Can you kindly share a link? I would definitely pursue this route as our
 transformations are really simple.

 Best

 On Wed, Jun 17, 2015 at 10:26 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 My Use case is below

 We are going to receive lot of event as stream ( basically Kafka Stream
 ) and then we need to process and compute

 Consider you have a phone contract with ATT and every call / sms / data
 useage you do is an event and then it needs  to calculate your bill on real
 time basis so when you login to your account you can see all those variable
 as how much you used and how much is left and what is your bill till date
 ,Also there are different rules which need to be considered when you
 calculate the total bill one simple rule will be 0-500 min it is free but
 above it is $1 a min.

 How do i maintain a shared state  ( total amount , total min , total
 data etc ) so that i know how much i accumulated at any given point as
 events for same phone can go to any node / executor.

 Can some one please tell me how can i achieve this is spark as in storm
 i can have a bolt which can do this ?

 Thanks,



 On Wed, Jun 17, 2015 at 4:52 AM, Enno Shioji eshi...@gmail.com wrote:

 I guess both. In terms of syntax, I was comparing it with Trident.

 If you are joining, Spark Streaming actually does offer windowed join
 out of the box. We couldn't use this though as our event stream can grow
 out-of-sync, so we had to implement something on top of Storm. If your
 event streams don't become out of sync, you may find the built-in join in
 Spark Streaming useful. Storm also has a join keyword but its semantics 
 are
 different.


  Also, what do you mean by No Back Pressure ?

 So when a topology is overloaded, Storm is designed so that it will
 stop reading from the source. Spark on the other hand, will keep reading
 from the source and spilling it internally. This maybe fine, in fairness,
 but it does mean you have to worry about the persistent store usage in the
 processing cluster, whereas with Storm you don't have to worry because the
 messages just remain in the data store.

 Spark came up with the idea of rate limiting, but I don't feel this is
 as nice as back pressure because it's very difficult to tune it such that
 you don't cap the cluster's processing power but yet so that it will
 prevent the persistent storage to get used up.


 On Wed, Jun 17, 2015 at 9:33 AM, Spark Enthusiast 
 sparkenthusi...@yahoo.in wrote:

 When you say Storm, did you mean Storm with Trident or Storm?

 My use case does not have simple transformation. There are complex
 events that need to be generated by joining the incoming event stream.

 Also, what do you mean by No Back PRessure ?





   On Wednesday, 17 June 2015 11:57 AM, Enno Shioji eshi...@gmail.com
 wrote:


 We've evaluated Spark Streaming vs. Storm and ended up sticking with
 Storm.

 Some of the important draw backs are:
 Spark has no back pressure (receiver rate limit can alleviate this to
 a certain point, but it's far from ideal)
 There is also no exactly-once semantics. (updateStateByKey can
 achieve this semantics, but is not practical if you have any significant
 amount of state because it does so by dumping the entire state on every
 checkpointing)

 There are also some minor drawbacks that I'm sure will be fixed
 quickly, like no task timeout, not being able

Re: Spark or Storm

2015-06-17 Thread Enno Shioji
The thing is, even with that improvement, you still have to make updates
idempotent or transactional yourself. If you read
http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics

that refers to the latest version, it says:

Semantics of output operations

Output operations (like foreachRDD) have *at-least once* semantics, that
is, the transformed data may get written to an external entity more than
once in the event of a worker failure. While this is acceptable for saving
to file systems using the saveAs***Files operations (as the file will
simply get overwritten with the same data), additional effort may be
necessary to achieve exactly-once semantics. There are two approaches.

   -

   *Idempotent updates*: Multiple attempts always write the same data. For
   example, saveAs***Files always writes the same data to the generated
   files.
   -

   *Transactional updates*: All updates are made transactionally so that
   updates are made exactly once atomically. One way to do this would be the
   following.
   - Use the batch time (available in foreachRDD) and the partition index
  of the transformed RDD to create an identifier. This identifier uniquely
  identifies a blob data in the streaming application.
  - Update external system with this blob transactionally (that is,
  exactly once, atomically) using the identifier. That is, if the
identifier
  is not already committed, commit the partition data and the identifier
  atomically. Else if this was already committed, skip the update.


So either you make the update idempotent, or you have to make it
transactional yourself, and the suggested mechanism is very similar to what
Storm does.




On Wed, Jun 17, 2015 at 3:51 PM, Ashish Soni asoni.le...@gmail.com wrote:

 @Enno
 As per the latest version and documentation Spark Streaming does offer
 exactly once semantics using improved kafka integration , Not i have not
 tested yet.

 Any feedback will be helpful if anyone is tried the same.

 http://koeninger.github.io/kafka-exactly-once/#7


 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html



 On Wed, Jun 17, 2015 at 10:33 AM, Enno Shioji eshi...@gmail.com wrote:

 AFAIK KCL is *supposed* to provide fault tolerance and load balancing
 (plus additionally, elastic scaling unlike Storm), Kinesis providing the
 coordination. My understanding is that it's like a naked Storm worker
 process that can consequently only do map.

 I haven't really used it tho, so can't really comment how it compares to
 Spark/Storm. Maybe somebody else will be able to comment.



 On Wed, Jun 17, 2015 at 3:13 PM, ayan guha guha.a...@gmail.com wrote:

 Thanks for this. It's kcl based kinesis application. But because its
 just a Java application we are thinking to use spark on EMR or storm for
 fault tolerance and load balancing. Is it a correct approach?
 On 17 Jun 2015 23:07, Enno Shioji eshi...@gmail.com wrote:

 Hi Ayan,

 Admittedly I haven't done much with Kinesis, but if I'm not mistaken
 you should be able to use their processor interface for that. In this
 example, it's incrementing a counter:
 https://github.com/awslabs/amazon-kinesis-data-visualization-sample/blob/master/src/main/java/com/amazonaws/services/kinesis/samples/datavis/kcl/CountingRecordProcessor.java

 Instead of incrementing a counter, you could do your transformation and
 send it to HBase.






 On Wed, Jun 17, 2015 at 1:40 PM, ayan guha guha.a...@gmail.com wrote:

 Great discussion!!

 One qs about some comment: Also, you can do some processing with
 Kinesis. If all you need to do is straight forward transformation and you
 are reading from Kinesis to begin with, it might be an easier option to
 just do the transformation in Kinesis

 - Do you mean KCL application? Or some kind of processing
 withinKineis?

 Can you kindly share a link? I would definitely pursue this route as
 our transformations are really simple.

 Best

 On Wed, Jun 17, 2015 at 10:26 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 My Use case is below

 We are going to receive lot of event as stream ( basically Kafka
 Stream ) and then we need to process and compute

 Consider you have a phone contract with ATT and every call / sms /
 data useage you do is an event and then it needs  to calculate your bill 
 on
 real time basis so when you login to your account you can see all those
 variable as how much you used and how much is left and what is your bill
 till date ,Also there are different rules which need to be considered 
 when
 you calculate the total bill one simple rule will be 0-500 min it is free
 but above it is $1 a min.

 How do i maintain a shared state  ( total amount , total min , total
 data etc ) so that i know how much i accumulated at any given point as
 events for same phone can go to any node / executor.

 Can some one please tell me how can i achieve this is spark as in
 storm i can have a bolt which can do

Is a higher-res or vector version of Spark logo available?

2015-04-23 Thread Enno Shioji
My employer (adform.com) would like to use the Spark logo in a recruitment
event (to indicate that we are using Spark in our company). I looked in the
Spark repo (https://github.com/apache/spark/tree/master/docs/img) but
couldn't find a vector format.

Is a higher-res or vector format version available anywhere?

Enno


Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
I see. I'd really benchmark how the parsing performs outside Spark (in a
tight loop or something). If *that* is slow, you know it's the parsing. If
not, it's not the parsing.

Another thing you want to look at is CPU usage. If the actual parsing
really is the bottleneck, you should see very high CPU utilization. If not,
it's not the parsing per se but rather the ability to feed the messages to
the parsing library.


ᐧ

On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to
 have a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and
 you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share 
 the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
 parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also
 tried it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards









Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
Huh, that would come to 6.5ms per one JSON. That does feel like a lot but
if your JSON file is big enough, I guess you could get that sort of
processing time.

Jackson is more or less the most efficient JSON parser out there, so unless
the Scala API is somehow affecting it, I don't see any better way. If you
only need to read parts of the JSON, you could look into exploiting
Jackson's stream parsing API http://wiki.fasterxml.com/JacksonStreamingApi
.

I guess the good news is you can throw machines at it. You could also look
into other serialization frameworks.



ᐧ

On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Thanks again!
 Its with the parser only, just tried the parser
 https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And
 it took me 52 Sec to process 8k json records. Not sure if there's an
 efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD
 and all it will be much faster, but i need that in SparkStreaming.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote:

 I see. I'd really benchmark how the parsing performs outside Spark (in a
 tight loop or something). If *that* is slow, you know it's the parsing. If
 not, it's not the parsing.

 Another thing you want to look at is CPU usage. If the actual parsing
 really is the bottleneck, you should see very high CPU utilization. If not,
 it's not the parsing per se but rather the ability to feed the messages to
 the parsing library.


 ᐧ

 On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to
 have a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and
 you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have 
 tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com
 wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the
 ObjectMapper every time. This is quite expensive and jackson recommends 
 you
 to share the instance. However, if you tried other parsers / 
 mapPartitions
 without success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of 
 memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's
 Jackson parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also
 tried it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards











Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
(adding back user)

Fair enough. Regarding serialization exception, the hack I use is to have a
object with a transient lazy field, like so:


object Holder extends Serializable {
  @transient lazy val mapper = new ObjectMapper()
}

This way, the ObjectMapper will be instantiated at the destination and you
can share the instance.



ᐧ

On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 I'm getting a low performance while parsing json data. My cluster setup
 is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4
 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
 parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also tried
 it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards






Re: Profiling in YourKit

2015-02-07 Thread Enno Shioji
 1
You have 4 CPU core and 34 threads (system wide, you likely have many more,
by the way).
Think of it as having 4 espresso machine and 34 baristas. Does the fact
that you have only 4 espresso machine mean you can only have 4 baristas? Of
course not, there's plenty more work other than making espresso, like
foaming the milk, talking to customers (IO) etc. They just have to use the
espresso machine in turn, which is managed by the OS.

 2
Imagine you are making 100 cups and 10K cups of coffee, respectively. If
you have 4 espresso machine, what's the most sensible thing to do? Probably
just using 4 machines in both cases.








ᐧ

On Sat, Feb 7, 2015 at 10:14 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 I am using YourKit tool to profile Spark jobs that is run in my Single
 Node Spark Cluster.
 When I see the YourKit UI Performance Charts, the thread count always
 remains at
 All threads: 34
 Daemon threads: 32

 Here are my questions:

 1. My system can run only 4 threads simultaneously, and obviously my
 system does not have 34 threads. What could 34 threads mean?

 2. I tried running the same job with four different datasets, two small
 and two relatively big. But in the UI the thread count increases by two,
 irrespective of data size. Does this mean that the number of threads
 allocated to each job depending on data size is not taken care by the
 framework?

 Thank You



Re: Problems with Spark Core 1.2.0 SBT project in IntelliJ

2015-01-13 Thread Enno Shioji
Have you tried adding this line?
  javax.servlet % javax.servlet-api % 3.0.1 % provided

This made the problem go away for me. It also works without the provided
scope.
ᐧ

On Wed, Jan 14, 2015 at 5:09 AM, Night Wolf nightwolf...@gmail.com wrote:

 Thanks for the tips!

 Yeah its as a working SBT project. I.e. if I do an SBT run it picks up
 Test1 as a main class and runs it for me without error. Its only in
 IntelliJ. I opened the project from the folder afresh by choosing the
 build.sbt file. I re-tested by deleting .idea and just choosing the project
 folder and I get the same result.  Not using gen-idea in sbt.



 On Wed, Jan 14, 2015 at 8:52 AM, Jay Vyas jayunit100.apa...@gmail.com
 wrote:

 I find importing a working SBT project into IntelliJ is the way to
 go.

 How did you load the project into intellij?

 On Jan 13, 2015, at 4:45 PM, Enno Shioji eshi...@gmail.com wrote:

 Had the same issue. I can't remember what the issue was but this works:

 libraryDependencies ++= {
   val sparkVersion = 1.2.0
   Seq(
 org.apache.spark %% spark-core % sparkVersion % provided,
 org.apache.spark %% spark-streaming % sparkVersion % provided,
 org.apache.spark %% spark-streaming-twitter % sparkVersion %
 provided,
 org.apache.spark %% spark-streaming-kafka % sparkVersion %
 provided,
 javax.servlet % javax.servlet-api % 3.0.1 % provided
   )
 }

 In order to run classes in main source in Intellij, you must invoke it
 from a source under test as Intellij won't provide the provided scope
 libraries when running code in main source (but it will for sources under
 test).

 With this config you can sbt assembly in order to get the fat jar
 without Spark jars.


 ᐧ

 On Tue, Jan 13, 2015 at 12:16 PM, Night Wolf nightwolf...@gmail.com
 wrote:

 Hi,

 I'm trying to load up an SBT project in IntelliJ 14 (windows) running
 1.7 JDK, SBT 0.13.5 -I seem to be getting errors with the project.

 The build.sbt file is super simple;

 name := scala-spark-test1
 version := 1.0

 scalaVersion := 2.10.4

 libraryDependencies += org.apache.spark %% spark-core % 1.2.0


 Then I have a super simple test class;

 package test

 import org.apache.spark.{SparkContext, SparkConf}

 case class Blah(s: Int, d: String)

 object Test1  {
   def main(args: Array[String]): Unit = {
 val sparkconf = new
 SparkConf().setMaster(local[4]).setAppName(test-spark)
 val sc = new SparkContext(sparkconf)

 val rdd = sc.parallelize(Seq(
   Blah(1,dsdsd),
   Blah(2,daaa),
   Blah(3,dhghghgh)
 ))

 rdd.collect().foreach(println)

   }
 }


 When I try to run the Test1 object in IntelliJ I get the following error;

 Exception in thread main java.lang.NoClassDefFoundError:
 javax/servlet/http/HttpServletResponse
 at org.apache.spark.HttpServer.org
 $apache$spark$HttpServer$$doStart(HttpServer.scala:73)
 at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:60)
 at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:60)
 at
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1676)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1667)
 at org.apache.spark.HttpServer.start(HttpServer.scala:60)
 at org.apache.spark.HttpFileServer.initialize(HttpFileServer.scala:45)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:304)
 at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159)
 at org.apache.spark.SparkContext.init(SparkContext.scala:232)
 at test.Test1$.main(Test1.scala:10)
 at test.Test1.main(Test1.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
 Caused by: java.lang.ClassNotFoundException:
 javax.servlet.http.HttpServletResponse
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 18 more


 For whatever reason it seems that IntelliJ isnt pulling in these deps.
 Doing an sbt run works fine. Looking at the project structure it seems that
 7 libs dont get marked as a dependency for my module... But they are on the
 dep tree http://pastebin.com/REkQh5ux

 image.png

 Is this something to do with the libs and scoping or shading in Spark
 and its associated libs? Has anyone else seen this issue?

 Cheers,
 NW






Re: Problems with Spark Core 1.2.0 SBT project in IntelliJ

2015-01-13 Thread Enno Shioji
Had the same issue. I can't remember what the issue was but this works:

libraryDependencies ++= {
  val sparkVersion = 1.2.0
  Seq(
org.apache.spark %% spark-core % sparkVersion % provided,
org.apache.spark %% spark-streaming % sparkVersion % provided,
org.apache.spark %% spark-streaming-twitter % sparkVersion %
provided,
org.apache.spark %% spark-streaming-kafka % sparkVersion %
provided,
javax.servlet % javax.servlet-api % 3.0.1 % provided
  )
}

In order to run classes in main source in Intellij, you must invoke it
from a source under test as Intellij won't provide the provided scope
libraries when running code in main source (but it will for sources under
test).

With this config you can sbt assembly in order to get the fat jar without
Spark jars.


ᐧ

On Tue, Jan 13, 2015 at 12:16 PM, Night Wolf nightwolf...@gmail.com wrote:

 Hi,

 I'm trying to load up an SBT project in IntelliJ 14 (windows) running 1.7
 JDK, SBT 0.13.5 -I seem to be getting errors with the project.

 The build.sbt file is super simple;

 name := scala-spark-test1
 version := 1.0

 scalaVersion := 2.10.4

 libraryDependencies += org.apache.spark %% spark-core % 1.2.0


 Then I have a super simple test class;

 package test

 import org.apache.spark.{SparkContext, SparkConf}

 case class Blah(s: Int, d: String)

 object Test1  {
   def main(args: Array[String]): Unit = {
 val sparkconf = new
 SparkConf().setMaster(local[4]).setAppName(test-spark)
 val sc = new SparkContext(sparkconf)

 val rdd = sc.parallelize(Seq(
   Blah(1,dsdsd),
   Blah(2,daaa),
   Blah(3,dhghghgh)
 ))

 rdd.collect().foreach(println)

   }
 }


 When I try to run the Test1 object in IntelliJ I get the following error;

 Exception in thread main java.lang.NoClassDefFoundError:
 javax/servlet/http/HttpServletResponse
 at org.apache.spark.HttpServer.org
 $apache$spark$HttpServer$$doStart(HttpServer.scala:73)
 at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:60)
 at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:60)
 at
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1676)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1667)
 at org.apache.spark.HttpServer.start(HttpServer.scala:60)
 at org.apache.spark.HttpFileServer.initialize(HttpFileServer.scala:45)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:304)
 at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159)
 at org.apache.spark.SparkContext.init(SparkContext.scala:232)
 at test.Test1$.main(Test1.scala:10)
 at test.Test1.main(Test1.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
 Caused by: java.lang.ClassNotFoundException:
 javax.servlet.http.HttpServletResponse
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 18 more


 For whatever reason it seems that IntelliJ isnt pulling in these deps.
 Doing an sbt run works fine. Looking at the project structure it seems that
 7 libs dont get marked as a dependency for my module... But they are on the
 dep tree http://pastebin.com/REkQh5ux

 [image: Inline image 1]

 Is this something to do with the libs and scoping or shading in Spark and
 its associated libs? Has anyone else seen this issue?

 Cheers,
 NW



Re: Registering custom metrics

2015-01-08 Thread Enno Shioji
FYI I found this approach by Ooyala.

/** Instrumentation for Spark based on accumulators.
  *
  * Usage:
  * val instrumentation = new SparkInstrumentation(example.metrics)
  * val numReqs = sc.accumulator(0L)
  * instrumentation.source.registerDailyAccumulator(numReqs, numReqs)
  * instrumentation.register()
  *
  * Will create and report the following metrics:
  * - Gauge with total number of requests (daily)
  * - Meter with rate of requests
  *
  * @param prefix prefix for all metrics that will be reported by this
Instrumentation
  */

https://gist.github.com/ibuenros/9b94736c2bad2f4b8e23
ᐧ

On Mon, Jan 5, 2015 at 2:56 PM, Enno Shioji eshi...@gmail.com wrote:

 Hi Gerard,

 Thanks for the answer! I had a good look at it, but I couldn't figure out
 whether one can use that to emit metrics from your application code.

 Suppose I wanted to monitor the rate of bytes I produce, like so:

 stream
 .map { input =
   val bytes = produce(input)
   // metricRegistry.meter(some.metrics).mark(bytes.length)
   bytes
 }
 .saveAsTextFile(text)

 Is there a way to achieve this with the MetricSystem?


 ᐧ

 On Mon, Jan 5, 2015 at 10:24 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 Yes, I managed to create a register custom metrics by creating an
  implementation  of org.apache.spark.metrics.source.Source and
 registering it to the metrics subsystem.
 Source is [Spark] private, so you need to create it under a org.apache.spark
 package. In my case, I'm dealing with Spark Streaming metrics, and I
 created my CustomStreamingSource under org.apache.spark.streaming as I
 also needed access to some [Streaming] private components.

 Then, you register your new metric Source on the Spark's metric system,
 like so:

 SparkEnv.get.metricsSystem.registerSource(customStreamingSource)

 And it will get reported to the metrics Sync active on your system. By
 default, you can access them through the metric endpoint:
 http://driver-host:ui-port/metrics/json

 I hope this helps.

 -kr, Gerard.






 On Tue, Dec 30, 2014 at 3:32 PM, eshioji eshi...@gmail.com wrote:

 Hi,

 Did you find a way to do this / working on this?
 Am trying to find a way to do this as well, but haven't been able to
 find a
 way.



 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/Registering-custom-metrics-tp9030p9968.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org






TestSuiteBase based unit test using a sliding window join timesout

2015-01-07 Thread Enno Shioji
Hi,

I extended org.apache.spark.streaming.TestSuiteBase for some testing, and I
was able to run this test fine:

test(Sliding window join with 3 second window duration) {

  val input1 =
Seq(
  Seq(req1),
  Seq(req2, req3),
  Seq(),
  Seq(req4, req5, req6),
  Seq(req7),
  Seq(),
  Seq()
)

  val input2 =
Seq(
  Seq((tx1, req1)),
  Seq(),
  Seq((tx2, req3)),
  Seq((tx3, req2)),
  Seq(),
  Seq((tx4, req7)),
  Seq((tx5, req5), (tx6, req4))
)

  val expectedOutput = Seq(
Seq((req1, (1, tx1))),
Seq(),
Seq((req3, (1, tx2))),
Seq((req2, (1, tx3))),
Seq(),
Seq((req7, (1, tx4))),
Seq()
  )
  val operation = (rq: DStream[String], tx: DStream[(String,String)]) = {
rq.window(Seconds(3), Seconds(1)).map(x = (x, 1)).join(tx.map{ case
(k, v) = (v, k)})
  }
  testOperation(input1, input2, operation, expectedOutput, useSet=true)
}

However, this seemingly OK looking test fails with operation timeout:

test(Sliding window join with 3 second window duration + a tumbling
window) {

  val input1 =
Seq(
  Seq(req1),
  Seq(req2, req3),
  Seq(),
  Seq(req4, req5, req6),
  Seq(req7),
  Seq()
)

  val input2 =
Seq(
  Seq((tx1, req1)),
  Seq(),
  Seq((tx2, req3)),
  Seq((tx3, req2)),
  Seq(),
  Seq((tx4, req7))
)

  val expectedOutput = Seq(
Seq((req1, (1, tx1))),
Seq((req2, (1, tx3)), (req3, (1, tx3))),
Seq((req7, (1, tx4)))
  )
  val operation = (rq: DStream[String], tx: DStream[(String,String)]) = {
rq.window(Seconds(3), Seconds(2)).map(x = (x,
1)).join(tx.window(Seconds(2), Seconds(2)).map{ case (k, v) = (v, k)})
  }
  testOperation(input1, input2, operation, expectedOutput, useSet=true)
}

Stacktrace:
10033 was not less than 1 Operation timed out after 10033 ms
org.scalatest.exceptions.TestFailedException: 10033 was not less than 1
Operation timed out after 10033 ms
at
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
at
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
at
org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:338)

Does anybody know why this could be?
ᐧ


Re: Better way of measuring custom application metrics

2015-01-04 Thread Enno Shioji
Hi Jerry, thanks for your answer.

I had looked at MetricsSystem, but I couldn't see how I could use it in my
use case, which is:


stream

.map { i =

  Metriker.mr.meter(Metriker.metricName(testmetric123)).mark(i)

  i * 2

}

From what I can see, a Source accepts an object and describes how to poll
it for metrics. Presumably that's why Sources have only Gauges and never
Meters, for example. In my case, I don't have a state that I want Spark's
MetricSystem to poll.

If I could get a reference to an internal metricRegistry instance AND an
task identifier in my functions, I could achieve the same thing while using
Spark's metric configuration, but I couldn't find a way to do this either...








ᐧ
On Sun, Jan 4, 2015 at 2:46 AM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi,



 I think there’s a StreamingSource in Spark Streaming which exposes the
 Spark Streaming running status to the metrics sink, you can connect it with
 Graphite sink to expose metrics to Graphite. I’m not sure is this what you
 want.



 Besides you can customize the Source and Sink of the MetricsSystem to
 build your own and configure it in metrics.properties with class name to
 let it loaded by metrics system, for the details you can refer to
 http://spark.apache.org/docs/latest/monitoring.html or source code.



 Thanks

 Jerry



 *From:* Enno Shioji [mailto:eshi...@gmail.com]
 *Sent:* Sunday, January 4, 2015 7:47 AM
 *To:* user@spark.apache.org
 *Subject:* Better way of measuring custom application metrics



 I have a hack to gather custom application metrics in a Streaming job, but
 I wanted to know if there is any better way of doing this.



 My hack consists of this singleton:



 object Metriker extends Serializable {

   @transient lazy val mr: MetricRegistry = {

 val metricRegistry = new MetricRegistry()

 val graphiteEndpoint = new InetSocketAddress(
 ec2-54-220-56-229.eu-west-1.compute.amazonaws.com, 2003)

 GraphiteReporter

   .forRegistry(metricRegistry)

   .build(new Graphite(graphiteEndpoint))

   .start(5, TimeUnit.SECONDS)

 metricRegistry

   }



   @transient lazy val processId =
 ManagementFactory.getRuntimeMXBean.getName



   @transient lazy val hostId = {

 try {

   InetAddress.getLocalHost.getHostName

 } catch {

   case e: UnknownHostException = localhost

 }

   }



def metricName(name: String): String = {

 %s.%s.%s.format(name, hostId, processId)

   }

 }





 which I then use in my jobs like so:



 stream

 .map { i =

   Metriker.mr.meter(Metriker.metricName(testmetric123)).mark(i)

   i * 2

 }



 Then I aggregate the metrics on Graphite. This works, but I was curious to
 know if anyone has a less hacky way.





 ᐧ



Better way of measuring custom application metrics

2015-01-03 Thread Enno Shioji
I have a hack to gather custom application metrics in a Streaming job, but
I wanted to know if there is any better way of doing this.

My hack consists of this singleton:

object Metriker extends Serializable {
  @transient lazy val mr: MetricRegistry = {
val metricRegistry = new MetricRegistry()
val graphiteEndpoint = new InetSocketAddress(
ec2-54-220-56-229.eu-west-1.compute.amazonaws.com, 2003)
GraphiteReporter
  .forRegistry(metricRegistry)
  .build(new Graphite(graphiteEndpoint))
  .start(5, TimeUnit.SECONDS)
metricRegistry
  }

  @transient lazy val processId = ManagementFactory.getRuntimeMXBean.getName

  @transient lazy val hostId = {
try {
  InetAddress.getLocalHost.getHostName
} catch {
  case e: UnknownHostException = localhost
}
  }

   def metricName(name: String): String = {
%s.%s.%s.format(name, hostId, processId)
  }
}


which I then use in my jobs like so:

stream
.map { i =
  Metriker.mr.meter(Metriker.metricName(testmetric123)).mark(i)
  i * 2
}

Then I aggregate the metrics on Graphite. This works, but I was curious to
know if anyone has a less hacky way.


ᐧ


Re: Big performance difference between client and cluster deployment mode; is this expected?

2014-12-31 Thread Enno Shioji
Also the job was deployed from the master machine in the cluster.
ᐧ

On Wed, Dec 31, 2014 at 6:35 PM, Enno Shioji eshi...@gmail.com wrote:

 Oh sorry that was a edit mistake. The code is essentially:

  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])

 I.e. there is essentially no original code (I was calling saveAsTextFile
 in a save function but that was just a remnant from previous debugging).


 ᐧ

 On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote:

 -dev, +user

 A decent guess: Does your 'save' function entail collecting data back
 to the driver? and are you running this from a machine that's not in
 your Spark cluster? Then in client mode you're shipping data back to a
 less-nearby machine, compared to with cluster mode. That could explain
 the bottleneck.

 On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote:
  Hi,
 
  I have a very, very simple streaming job. When I deploy this on the
 exact
  same cluster, with the exact same parameters, I see big (40%)
 performance
  difference between client and cluster deployment mode. This seems a
 bit
  surprising.. Is this expected?
 
  The streaming job is:
 
  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.foreachRDD(save)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])
 
  I tried several times, but the job deployed with client mode can only
  write at 60% throughput of the job deployed with cluster mode and this
  happens consistently. I'm logging at INFO level, but my application code
  doesn't log anything so it's only Spark logs. The logs I see in client
  mode doesn't seem like a crazy amount.
 
  The setup is:
  spark-ec2 [...] \
--copy-aws-credentials \
--instance-type=m3.2xlarge \
-s 2 launch test_cluster
 
  And all the deployment was done from the master machine.
 
  ᐧ





Re: Big performance difference between client and cluster deployment mode; is this expected?

2014-12-31 Thread Enno Shioji
Oh sorry that was a edit mistake. The code is essentially:

 val msgStream = kafkaStream
   .map { case (k, v) = v}
   .map(DatatypeConverter.printBase64Binary)
   .saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])

I.e. there is essentially no original code (I was calling saveAsTextFile in
a save function but that was just a remnant from previous debugging).


ᐧ

On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote:

 -dev, +user

 A decent guess: Does your 'save' function entail collecting data back
 to the driver? and are you running this from a machine that's not in
 your Spark cluster? Then in client mode you're shipping data back to a
 less-nearby machine, compared to with cluster mode. That could explain
 the bottleneck.

 On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote:
  Hi,
 
  I have a very, very simple streaming job. When I deploy this on the exact
  same cluster, with the exact same parameters, I see big (40%) performance
  difference between client and cluster deployment mode. This seems a
 bit
  surprising.. Is this expected?
 
  The streaming job is:
 
  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.foreachRDD(save)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])
 
  I tried several times, but the job deployed with client mode can only
  write at 60% throughput of the job deployed with cluster mode and this
  happens consistently. I'm logging at INFO level, but my application code
  doesn't log anything so it's only Spark logs. The logs I see in client
  mode doesn't seem like a crazy amount.
 
  The setup is:
  spark-ec2 [...] \
--copy-aws-credentials \
--instance-type=m3.2xlarge \
-s 2 launch test_cluster
 
  And all the deployment was done from the master machine.
 
  ᐧ



Re: Big performance difference between client and cluster deployment mode; is this expected?

2014-12-31 Thread Enno Shioji
Hi Tathagata,

It's a standalone cluster. The submit commands are:

== CLIENT
spark-submit --class com.fake.Test \
--deploy-mode client --master spark://fake.com:7077 \
fake.jar arguments

== CLUSTER
 spark-submit --class com.fake.Test \
 --deploy-mode cluster --master spark://fake.com:7077 \
 s3n://fake.jar arguments

And they are both occupying all available slots. (8 * 2 machine = 16 slots).


ᐧ

On Thu, Jan 1, 2015 at 12:21 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Whats your spark-submit commands in both cases? Is it Spark Standalone or
 YARN (both support client and cluster)? Accordingly what is the number of
 executors/cores requested?

 TD

 On Wed, Dec 31, 2014 at 10:36 AM, Enno Shioji eshi...@gmail.com wrote:

 Also the job was deployed from the master machine in the cluster.

 On Wed, Dec 31, 2014 at 6:35 PM, Enno Shioji eshi...@gmail.com wrote:

 Oh sorry that was a edit mistake. The code is essentially:

  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])

 I.e. there is essentially no original code (I was calling saveAsTextFile
 in a save function but that was just a remnant from previous debugging).



 On Wed, Dec 31, 2014 at 6:21 PM, Sean Owen so...@cloudera.com wrote:

 -dev, +user

 A decent guess: Does your 'save' function entail collecting data back
 to the driver? and are you running this from a machine that's not in
 your Spark cluster? Then in client mode you're shipping data back to a
 less-nearby machine, compared to with cluster mode. That could explain
 the bottleneck.

 On Wed, Dec 31, 2014 at 4:12 PM, Enno Shioji eshi...@gmail.com wrote:
  Hi,
 
  I have a very, very simple streaming job. When I deploy this on the
 exact
  same cluster, with the exact same parameters, I see big (40%)
 performance
  difference between client and cluster deployment mode. This seems
 a bit
  surprising.. Is this expected?
 
  The streaming job is:
 
  val msgStream = kafkaStream
.map { case (k, v) = v}
.map(DatatypeConverter.printBase64Binary)
.foreachRDD(save)
.saveAsTextFile(s3n://some.bucket/path, classOf[LzoCodec])
 
  I tried several times, but the job deployed with client mode can
 only
  write at 60% throughput of the job deployed with cluster mode and
 this
  happens consistently. I'm logging at INFO level, but my application
 code
  doesn't log anything so it's only Spark logs. The logs I see in
 client
  mode doesn't seem like a crazy amount.
 
  The setup is:
  spark-ec2 [...] \
--copy-aws-credentials \
--instance-type=m3.2xlarge \
-s 2 launch test_cluster
 
  And all the deployment was done from the master machine.
 
  ᐧ







Writing and reading sequence file results in trailing extra data

2014-12-30 Thread Enno Shioji
Hi, I'm facing a weird issue. Any help appreciated.

When I execute the below code and compare input and output, each record
in the output has some extra trailing data appended to it, and hence
corrupted. I'm just reading and writing, so the input and output should be
exactly the same.

I'm using spark-core 1.2.0_2.10 and the Hadoop bundled in it
(hadoop-common: 2.2.0, hadoop-core: 1.2.1). I also confirmed the binary is
fine at the time it's passed to Hadoop classes, and has already the extra
data when in Hadoop classes (I guess this makes it more of a Hadoop
question...).

Code:
=
  def main(args: Array[String]) {
val conf = new SparkConf()
  .setMaster(local[4])
  .setAppName(Simple Application)

val sc = new SparkContext(conf)

   // input.txt is a text file with some Base64 encoded binaries stored as
lines

val src = sc
  .textFile(input.txt)
  .map(DatatypeConverter.parseBase64Binary)
  .map(x = (NullWritable.get(), new BytesWritable(x)))
  .saveAsSequenceFile(s3n://fake-test/stored)

val file = s3n://fake-test/stored
val logData = sc.sequenceFile(file, classOf[NullWritable],
classOf[BytesWritable])

val count = logData
  .map { case (k, v) = v }
  .map(x = DatatypeConverter.printBase64Binary(x.getBytes))
  .saveAsTextFile(/tmp/output)

  }

ᐧ


[SOLVED] Re: Writing and reading sequence file results in trailing extra data

2014-12-30 Thread Enno Shioji
This poor soul had the exact same problem and solution:

http://stackoverflow.com/questions/24083332/write-and-read-raw-byte-arrays-in-spark-using-sequence-file-sequencefile

ᐧ

On Tue, Dec 30, 2014 at 10:58 AM, Enno Shioji eshi...@gmail.com wrote:

 Hi, I'm facing a weird issue. Any help appreciated.

 When I execute the below code and compare input and output, each
 record in the output has some extra trailing data appended to it, and hence
 corrupted. I'm just reading and writing, so the input and output should be
 exactly the same.

 I'm using spark-core 1.2.0_2.10 and the Hadoop bundled in it
 (hadoop-common: 2.2.0, hadoop-core: 1.2.1). I also confirmed the binary is
 fine at the time it's passed to Hadoop classes, and has already the extra
 data when in Hadoop classes (I guess this makes it more of a Hadoop
 question...).

 Code:
 =
   def main(args: Array[String]) {
 val conf = new SparkConf()
   .setMaster(local[4])
   .setAppName(Simple Application)

 val sc = new SparkContext(conf)

// input.txt is a text file with some Base64 encoded binaries stored as
 lines

 val src = sc
   .textFile(input.txt)
   .map(DatatypeConverter.parseBase64Binary)
   .map(x = (NullWritable.get(), new BytesWritable(x)))
   .saveAsSequenceFile(s3n://fake-test/stored)

 val file = s3n://fake-test/stored
 val logData = sc.sequenceFile(file, classOf[NullWritable],
 classOf[BytesWritable])

 val count = logData
   .map { case (k, v) = v }
   .map(x = DatatypeConverter.printBase64Binary(x.getBytes))
   .saveAsTextFile(/tmp/output)

   }




ReceiverInputDStream#saveAsTextFiles with a S3 URL results in double forward slash key names in S3

2014-12-23 Thread Enno Shioji
Is anybody experiencing this? It looks like a bug in JetS3t to me, but
thought I'd sanity check before filing an issue.



I'm writing to S3 using ReceiverInputDStream#saveAsTextFiles with a S3 URL
(s3://fake-test/1234).

The code does write to S3, but with double forward slashes (e.g.
s3://fake-test//1234/-141933428/.

I did a debug and it seem like the culprit is
Jets3tFileSystemStore#pathToKey(path), which returns /fake-test/1234/...
for the input s3://fake-test/1234/ when it should hack off the first
forward slash. However, I couldn't find any bug report for JetS3t for this.

Am I missing something, or is this likely a JetS3t bug?



ᐧ


Re: ReceiverInputDStream#saveAsTextFiles with a S3 URL results in double forward slash key names in S3

2014-12-23 Thread Enno Shioji
ᐧ
I filed a new issue HADOOP-11444. According to HADOOP-10372, s3 is likely
to be deprecated anyway in favor of s3n.
Also the comment section notes that Amazon has implemented an EmrFileSystem
for S3 which is built using AWS SDK rather than JetS3t.




On Tue, Dec 23, 2014 at 2:06 PM, Enno Shioji eshi...@gmail.com wrote:

 Hey Jay :)

 I tried s3n which uses the Jets3tNativeFileSystemStore, and the double
 slash went away.
 As far as I can see, it does look like a bug in hadoop-common; I'll file a
 ticket for it.

 Hope you are doing well, by the way!

 PS:
  Jets3tNativeFileSystemStore's implementation of pathToKey is:
 ==
   private static String pathToKey(Path path) {
 if (path.toUri().getScheme() != null 
 path.toUri().getPath().isEmpty()) {
   // allow uris without trailing slash after bucket to refer to root,
   // like s3n://mybucket
   return ;
 }
 if (!path.isAbsolute()) {
   throw new IllegalArgumentException(Path must be absolute:  + path);
 }
 String ret = path.toUri().getPath().substring(1); // remove initial
 slash
 if (ret.endsWith(/)  (ret.indexOf(/) != ret.length() - 1)) {
   ret = ret.substring(0, ret.length() -1);
   }
 return ret;
   }
 ==

 whereas Jets3tFileSystemStore uses:
 ==
   private String pathToKey(Path path) {
 if (!path.isAbsolute()) {
   throw new IllegalArgumentException(Path must be absolute:  + path);
 }
 return path.toUri().getPath();
   }
 ==






 On Tue, Dec 23, 2014 at 1:07 PM, Jay Vyas jayunit100.apa...@gmail.com
 wrote:

 Hi enno.  Might be worthwhile to cross post this on dev@hadoop...
 Obviously a simple spark way to test this would be to change the uri to
 write to hdfs:// or maybe you could do file:// , and confirm that the extra
 slash goes away.

 - if it's indeed a jets3t issue we should add a new unit test for this if
 the hcfs tests are passing for jets3tfilesystem, yet this error still
 exists.

 - To learn how to run HCFS tests against any FileSystem , see the wiki
 page : https://wiki.apache.org/hadoop/HCFS/Progress (see the July 14th
 entry on that page).

 - Is there another S3FileSystem implementation for AbstractFileSystem or
 is jets3t the only one?  That would be a easy  way to test this. And also a
 good workaround.

 I'm wondering, also why jets3tfilesystem is the AbstractFileSystem used
 by so many - is that the standard impl for storing using AbstractFileSystem
 interface?

 On Dec 23, 2014, at 6:06 AM, Enno Shioji eshi...@gmail.com wrote:

 Is anybody experiencing this? It looks like a bug in JetS3t to me, but
 thought I'd sanity check before filing an issue.


 
 I'm writing to S3 using ReceiverInputDStream#saveAsTextFiles with a S3
 URL (s3://fake-test/1234).

 The code does write to S3, but with double forward slashes (e.g.
 s3://fake-test//1234/-141933428/.

 I did a debug and it seem like the culprit is
 Jets3tFileSystemStore#pathToKey(path), which returns /fake-test/1234/...
 for the input s3://fake-test/1234/ when it should hack off the first
 forward slash. However, I couldn't find any bug report for JetS3t for this.

 Am I missing something, or is this likely a JetS3t bug?