Re: Maelstrom: Kafka integration with Spark

2016-08-24 Thread Jeoffrey Lim
To clarify my earlier statement, I will continue working on Maelstrom
as an alternative to official Spark integration with Kafka and keep
the KafkaRDDs + Consumers as it is - until I find the official Spark Kafka
more stable and resilient to Kafka broker issues/failures (reason I have
infinite retry strategy on numerous places around Kafka related routines).

Not that i'm complaining or competing, at the end of the day having
a Spark App that continues to work overnight gives developer a good
sleep at night :)

On Thu, Aug 25, 2016 at 3:23 AM, Jeoffrey Lim <jeoffr...@gmail.com> wrote:

> Hi Cody, thank you for pointing out sub-millisecond processing, it is
> an "exaggerated" term :D I simply got excited releasing this project, it
> should be: "millisecond stream processing at the spark level".
>
> Highly appreciate the info about latest Kafka consumer. Would need
> to get up to speed about the most recent improvements and new features
> of Kafka itself.
>
> I think with Spark's latest Kafka Integration 0.10 features, Maelstrom's
> upside would only be the simple APIs (developer friendly). I'll play
> around with Spark 2.0 kafka-010 KafkaRDD to see if this is feasible.
>
>
> On Wed, Aug 24, 2016 at 10:46 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Yes, spark-streaming-kafka-0-10 uses the new consumer.   Besides
>> pre-fetching messages, the big reason for that is that security
>> features are only available with the new consumer.
>>
>> The Kafka project is at release 0.10.0.1 now, they think most of the
>> issues with the new consumer have been ironed out.  You can track the
>> progress as to when they'll remove the "beta" label at
>> https://issues.apache.org/jira/browse/KAFKA-3283
>>
>> As far as I know, Kafka in general can't achieve sub-millisecond
>> end-to-end stream processing, so my guess is you need to be more
>> specific about your terms there.
>>
>> I promise I'm not trying to start a pissing contest :)  just wanted to
>> check if you were aware of the current state of the other consumers.
>> Collaboration is always welcome.
>>
>>
>> On Tue, Aug 23, 2016 at 10:18 PM, Jeoffrey Lim <jeoffr...@gmail.com>
>> wrote:
>> > Apologies, I was not aware that Spark 2.0 has Kafka Consumer
>> caching/pooling
>> > now.
>> > What I have checked is the latest Kafka Consumer, and I believe it is
>> still
>> > in beta quality.
>> >
>> > https://kafka.apache.org/documentation.html#newconsumerconfigs
>> >
>> >> Since 0.9.0.0 we have been working on a replacement for our existing
>> >> simple and high-level consumers.
>> >> The code is considered beta quality.
>> >
>> > Not sure about this, does Spark 2.0 Kafka 0.10 integration already uses
>> this
>> > one? Is it now stable?
>> > With this caching feature in Spark 2,.0 could it achieve
>> sub-milliseconds
>> > stream processing now?
>> >
>> >
>> > Maelstrom still uses the old Kafka Simple Consumer, this library was
>> made
>> > open source so that I
>> > could continue working on it for future updates & improvements like
>> when the
>> > latest Kafka Consumer
>> > gets a stable release.
>> >
>> > We have been using Maelstrom "caching concept" for a long time now, as
>> > Receiver based Spark Kafka integration
>> > does not work for us. There were thoughts about using Direct Kafka APIs,
>> > however Maelstrom has
>> > very simple APIs and just "simply works" even under unstable scenarios
>> (e.g.
>> > advertised hostname failures on EMR).
>> >
>> > Maelstrom will work I believe even for Spark 1.3 and Kafka 0.8.2.1 (and
>> of
>> > course with the latest Kafka 0.10 as well)
>> >
>> >
>> > On Wed, Aug 24, 2016 at 9:49 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> >>
>> >> Were you aware that the spark 2.0 / kafka 0.10 integration also reuses
>> >> kafka consumer instances on the executors?
>> >>
>> >> On Tue, Aug 23, 2016 at 3:19 PM, Jeoffrey Lim <jeoffr...@gmail.com>
>> wrote:
>> >> > Hi,
>> >> >
>> >> > I have released the first version of a new Kafka integration with
>> Spark
>> >> > that we use in the company I work for: open sourced and named
>> Maelstrom.
>> >> >
>> >> > It is unique compared to other solutions out there as it reuses the
>> >> > Kafka Consumer connection to achieve sub-milliseconds latency.
>> >> >
>> >> > This library has been running stable in production environment and
>> has
>> >> > been proven to be resilient to numerous production issues.
>> >> >
>> >> >
>> >> > Please check out the project's page in github:
>> >> >
>> >> > https://github.com/jeoffreylim/maelstrom
>> >> >
>> >> >
>> >> > Contributors welcome!
>> >> >
>> >> >
>> >> > Cheers!
>> >> >
>> >> > Jeoffrey Lim
>> >> >
>> >> >
>> >> > P.S. I am also looking for a job opportunity, please look me up at
>> >> > Linked In
>> >
>> >
>>
>
>


Re: Maelstrom: Kafka integration with Spark

2016-08-24 Thread Jeoffrey Lim
Hi Cody, thank you for pointing out sub-millisecond processing, it is
an "exaggerated" term :D I simply got excited releasing this project, it
should be: "millisecond stream processing at the spark level".

Highly appreciate the info about latest Kafka consumer. Would need
to get up to speed about the most recent improvements and new features
of Kafka itself.

I think with Spark's latest Kafka Integration 0.10 features, Maelstrom's
upside would only be the simple APIs (developer friendly). I'll play
around with Spark 2.0 kafka-010 KafkaRDD to see if this is feasible.


On Wed, Aug 24, 2016 at 10:46 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Yes, spark-streaming-kafka-0-10 uses the new consumer.   Besides
> pre-fetching messages, the big reason for that is that security
> features are only available with the new consumer.
>
> The Kafka project is at release 0.10.0.1 now, they think most of the
> issues with the new consumer have been ironed out.  You can track the
> progress as to when they'll remove the "beta" label at
> https://issues.apache.org/jira/browse/KAFKA-3283
>
> As far as I know, Kafka in general can't achieve sub-millisecond
> end-to-end stream processing, so my guess is you need to be more
> specific about your terms there.
>
> I promise I'm not trying to start a pissing contest :)  just wanted to
> check if you were aware of the current state of the other consumers.
> Collaboration is always welcome.
>
>
> On Tue, Aug 23, 2016 at 10:18 PM, Jeoffrey Lim <jeoffr...@gmail.com>
> wrote:
> > Apologies, I was not aware that Spark 2.0 has Kafka Consumer
> caching/pooling
> > now.
> > What I have checked is the latest Kafka Consumer, and I believe it is
> still
> > in beta quality.
> >
> > https://kafka.apache.org/documentation.html#newconsumerconfigs
> >
> >> Since 0.9.0.0 we have been working on a replacement for our existing
> >> simple and high-level consumers.
> >> The code is considered beta quality.
> >
> > Not sure about this, does Spark 2.0 Kafka 0.10 integration already uses
> this
> > one? Is it now stable?
> > With this caching feature in Spark 2,.0 could it achieve sub-milliseconds
> > stream processing now?
> >
> >
> > Maelstrom still uses the old Kafka Simple Consumer, this library was made
> > open source so that I
> > could continue working on it for future updates & improvements like when
> the
> > latest Kafka Consumer
> > gets a stable release.
> >
> > We have been using Maelstrom "caching concept" for a long time now, as
> > Receiver based Spark Kafka integration
> > does not work for us. There were thoughts about using Direct Kafka APIs,
> > however Maelstrom has
> > very simple APIs and just "simply works" even under unstable scenarios
> (e.g.
> > advertised hostname failures on EMR).
> >
> > Maelstrom will work I believe even for Spark 1.3 and Kafka 0.8.2.1 (and
> of
> > course with the latest Kafka 0.10 as well)
> >
> >
> > On Wed, Aug 24, 2016 at 9:49 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Were you aware that the spark 2.0 / kafka 0.10 integration also reuses
> >> kafka consumer instances on the executors?
> >>
> >> On Tue, Aug 23, 2016 at 3:19 PM, Jeoffrey Lim <jeoffr...@gmail.com>
> wrote:
> >> > Hi,
> >> >
> >> > I have released the first version of a new Kafka integration with
> Spark
> >> > that we use in the company I work for: open sourced and named
> Maelstrom.
> >> >
> >> > It is unique compared to other solutions out there as it reuses the
> >> > Kafka Consumer connection to achieve sub-milliseconds latency.
> >> >
> >> > This library has been running stable in production environment and has
> >> > been proven to be resilient to numerous production issues.
> >> >
> >> >
> >> > Please check out the project's page in github:
> >> >
> >> > https://github.com/jeoffreylim/maelstrom
> >> >
> >> >
> >> > Contributors welcome!
> >> >
> >> >
> >> > Cheers!
> >> >
> >> > Jeoffrey Lim
> >> >
> >> >
> >> > P.S. I am also looking for a job opportunity, please look me up at
> >> > Linked In
> >
> >
>


Re: Maelstrom: Kafka integration with Spark

2016-08-23 Thread Jeoffrey Lim
Apologies, I was not aware that Spark 2.0 has Kafka Consumer
caching/pooling now.
What I have checked is the latest Kafka Consumer, and I believe it is still
in beta quality.

https://kafka.apache.org/documentation.html#newconsumerconfigs

> Since 0.9.0.0 we have been working on a replacement for our existing
simple and high-level consumers.
> The code is considered beta quality.

Not sure about this, does Spark 2.0 Kafka 0.10 integration already uses
this one? Is it now stable?
With this caching feature in Spark 2,.0 could it achieve sub-milliseconds
stream processing now?


Maelstrom still uses the old Kafka Simple Consumer, this library was made
open source so that I
could continue working on it for future updates & improvements like when
the latest Kafka Consumer
gets a stable release.

We have been using Maelstrom "caching concept" for a long time now, as
Receiver based Spark Kafka integration
does not work for us. There were thoughts about using Direct Kafka APIs,
however Maelstrom has
very simple APIs and just "simply works" even under unstable scenarios
(e.g. advertised hostname failures on EMR).

Maelstrom will work I believe even for Spark 1.3 and Kafka 0.8.2.1 (and of
course with the latest Kafka 0.10 as well)


On Wed, Aug 24, 2016 at 9:49 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Were you aware that the spark 2.0 / kafka 0.10 integration also reuses
> kafka consumer instances on the executors?
>
> On Tue, Aug 23, 2016 at 3:19 PM, Jeoffrey Lim <jeoffr...@gmail.com> wrote:
> > Hi,
> >
> > I have released the first version of a new Kafka integration with Spark
> > that we use in the company I work for: open sourced and named Maelstrom.
> >
> > It is unique compared to other solutions out there as it reuses the
> > Kafka Consumer connection to achieve sub-milliseconds latency.
> >
> > This library has been running stable in production environment and has
> > been proven to be resilient to numerous production issues.
> >
> >
> > Please check out the project's page in github:
> >
> > https://github.com/jeoffreylim/maelstrom
> >
> >
> > Contributors welcome!
> >
> >
> > Cheers!
> >
> > Jeoffrey Lim
> >
> >
> > P.S. I am also looking for a job opportunity, please look me up at
> Linked In
>


Maelstrom: Kafka integration with Spark

2016-08-23 Thread Jeoffrey Lim
Hi,

I have released the first version of a new Kafka integration with Spark
that we use in the company I work for: open sourced and named Maelstrom.

It is unique compared to other solutions out there as it reuses the
Kafka Consumer connection to achieve sub-milliseconds latency.

This library has been running stable in production environment and has
been proven to be resilient to numerous production issues.


Please check out the project's page in github:

https://github.com/jeoffreylim/maelstrom


Contributors welcome!


Cheers!

Jeoffrey Lim


P.S. I am also looking for a job opportunity, please look me up at Linked In


Re: How to initiate a shutdown of Spark Streaming context?

2014-09-15 Thread Jeoffrey Lim
What we did for gracefully shutting down the spark streaming context is
extend a Spark Web UI Tab and perform a
SparkContext.SparkUI.attachTab(custom web ui). However, the custom scala
Web UI extensions needs to be under the package org.apache.spark.ui to get
around with the package access restrictions.

Would it be possible that the SparkUI under SparkContext, and Spark Web UI
packages exposed as public so that developers may be able to add
customizations with their own tools?

Thanks!

On Tue, Sep 16, 2014 at 12:34 AM, stanley [via Apache Spark User List] 
ml-node+s1001560n14252...@n3.nabble.com wrote:

 Thank you.

 Would the following approaches to address this problem an overkills?

 a. create a ServerSocket in a different thread from the main thread that
 created the Spark StreamingContext, and listens to shutdown command there
 b. create a web service that wraps around the main thread that created the
 Spark StreamingContext, and responds to shutdown requests

 Does Spark Streaming already provide similar capabilities?

 Stanley

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initiate-a-shutdown-of-Spark-Streaming-context-tp14092p14252.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=amVvZmZyZXlsQGdtYWlsLmNvbXwxfDUzNTE3MDc2OQ==
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initiate-a-shutdown-of-Spark-Streaming-context-tp14092p14277.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Some Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..

2014-09-12 Thread Jeoffrey Lim
Our issue could be related to this problem as described in:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-td14027.html
which
the DStream is processed for every 1 hour batch duration.

I have implemented IO throttling in the Receiver as well in our Kafka
consumer, and our backlog is not that large.

NFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for dropping
INFO : org.apache.spark.storage.BlockManager - Dropping block
*input-0-1410443074600* from memory
INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600 of
size 12651900 dropped from memory (free 21220667)
INFO : org.apache.spark.storage.BlockManagerInfo - Removed
input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory
(size: 12.1 MB, free: 100.6 MB)

The question that I have now is: how to prevent the
MemoryStore/BlockManager of dropping the block inputs? And should they be
logged in the level WARN/ERROR?


Thanks.


On Fri, Sep 12, 2014 at 4:45 PM, Dibyendu Bhattacharya [via Apache Spark
User List] ml-node+s1001560n14075...@n3.nabble.com wrote:

 Dear all,

 I am sorry. This was a false alarm

 There was some issue in the RDD processing logic which leads to large
 backlog. Once I fixed the issues in my processing logic, I can see all
 messages being pulled nicely without any Block Removed error. I need to
 tune certain configurations in my Kafka Consumer to modify the data rate
 and also the batch size.

 Sorry again.


 Regards,
 Dibyendu

 On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu [hidden email]
 http://user/SendEmail.jtp?type=nodenode=14075i=0 wrote:

  This is my case about broadcast variable:

 14/07/21 19:49:13 INFO Executor: Running task ID 4
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2)
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost 
 (progress: 3/106)
 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
 hdfstest_customers
 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596
 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver
 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
 14/07/21 19:49:13 INFO Executor: Finished task ID 3
 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on 
 executor localhost: localhost (PROCESS_LOCAL)
 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes 
 in 0 ms
 14/07/21 19:49:13 INFO Executor: Running task ID 5
 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 
 19:49:13 INFO ContextCleaner: Cleaned broadcast 0*
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost 
 (progress: 4/106)
 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 
 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from 
 memory (free 886623436)*
 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0
 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0
 14/07/21 19:49:13 INFO HadoopRDD: Input split: 
 hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5
 14/07/21 
 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21 
 19:49:13 INFO HadoopRDD: Input split: 
 hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5
 14/07/21 
 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21 
 19:49:13 INFO TableOutputFormat: Created table instance for 
 hdfstest_customers
 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596
 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver
 14/07/21 19:49:13 INFO Executor: Finished task ID 4
 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on 
 executor localhost: localhost (PROCESS_LOCAL)
 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes 
 in 0 ms
 14/07/21 19:49:13 INFO Executor: Running task ID 6
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4)
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost 
 (progress: 5/106)
 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
 hdfstest_customers
 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596
 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver
 14/07/21 19:49:13 INFO Executor: Finished task ID 5
 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on 
 executor localhost: localhost (PROCESS_LOCAL)
 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes 
 in 0 ms
 14/07/21 19:49:13 INFO Executor: Running task ID 7
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5)
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost 
 (progress: 6/106)
 14/07/21 

Spark Streaming in 1 hour batch duration RDD files gets lost

2014-09-11 Thread Jeoffrey Lim
Hi,


Our spark streaming app is configured to pull data from Kafka in 1 hour
batch duration which performs aggregation of data by specific keys and
store the related RDDs to HDFS in the transform phase. We have tried
checkpoint of 7 days on the DStream of Kafka to ensure that the generated
stream does not expire/lost.

The first hour gets completed, but on the succeeding hours it always fails
with exception:


Job aborted due to stage failure: Task 39.0:1 failed 64 times, most recent
failure: Exception failure in TID 27578 on host X.ec2.internal:
java.io.FileNotFoundException:
/data/run/spark/work/spark-local-20140911175744-4ddf/0d/shuffle_3_1_311 (No
such file or directory) java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.init(FileOutputStream.java:221)
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116)
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
scala.collection.Iterator$class.foreach(Iterator.scala:727)


Environment:

CDH version: 2.3.0-cdh5.1.0
Spark version: 1.0.0-cdh5.1.0


Spark settings:

spark.io.compression.codec : org.apache.spark.io.SnappyCompressionCodec
spark.serializer : org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.mb : 2
spark.local.dir : /data/run/spark/work/
spark.scheduler.mode : FAIR
spark.rdd.compress : false
spark.task.maxFailures : 64
spark.shuffle.use.netty : false
spark.shuffle.spill : true
spark.streaming.checkpoint.dir :
hdfs://X.ec2.internal:8020/user/spark/checkpoints/event-storage
spark.akka.threads : 4
spark.cores.max : 4
spark.executor.memory : 3g
spark.shuffle.consolidateFiles : false
spark.streaming.unpersist : true
spark.logConf : true
spark.shuffle.spill.compress : true


Thanks,

JL




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-tp14027.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.