Re: Maelstrom: Kafka integration with Spark
To clarify my earlier statement, I will continue working on Maelstrom as an alternative to official Spark integration with Kafka and keep the KafkaRDDs + Consumers as it is - until I find the official Spark Kafka more stable and resilient to Kafka broker issues/failures (reason I have infinite retry strategy on numerous places around Kafka related routines). Not that i'm complaining or competing, at the end of the day having a Spark App that continues to work overnight gives developer a good sleep at night :) On Thu, Aug 25, 2016 at 3:23 AM, Jeoffrey Lim <jeoffr...@gmail.com> wrote: > Hi Cody, thank you for pointing out sub-millisecond processing, it is > an "exaggerated" term :D I simply got excited releasing this project, it > should be: "millisecond stream processing at the spark level". > > Highly appreciate the info about latest Kafka consumer. Would need > to get up to speed about the most recent improvements and new features > of Kafka itself. > > I think with Spark's latest Kafka Integration 0.10 features, Maelstrom's > upside would only be the simple APIs (developer friendly). I'll play > around with Spark 2.0 kafka-010 KafkaRDD to see if this is feasible. > > > On Wed, Aug 24, 2016 at 10:46 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Yes, spark-streaming-kafka-0-10 uses the new consumer. Besides >> pre-fetching messages, the big reason for that is that security >> features are only available with the new consumer. >> >> The Kafka project is at release 0.10.0.1 now, they think most of the >> issues with the new consumer have been ironed out. You can track the >> progress as to when they'll remove the "beta" label at >> https://issues.apache.org/jira/browse/KAFKA-3283 >> >> As far as I know, Kafka in general can't achieve sub-millisecond >> end-to-end stream processing, so my guess is you need to be more >> specific about your terms there. >> >> I promise I'm not trying to start a pissing contest :) just wanted to >> check if you were aware of the current state of the other consumers. >> Collaboration is always welcome. >> >> >> On Tue, Aug 23, 2016 at 10:18 PM, Jeoffrey Lim <jeoffr...@gmail.com> >> wrote: >> > Apologies, I was not aware that Spark 2.0 has Kafka Consumer >> caching/pooling >> > now. >> > What I have checked is the latest Kafka Consumer, and I believe it is >> still >> > in beta quality. >> > >> > https://kafka.apache.org/documentation.html#newconsumerconfigs >> > >> >> Since 0.9.0.0 we have been working on a replacement for our existing >> >> simple and high-level consumers. >> >> The code is considered beta quality. >> > >> > Not sure about this, does Spark 2.0 Kafka 0.10 integration already uses >> this >> > one? Is it now stable? >> > With this caching feature in Spark 2,.0 could it achieve >> sub-milliseconds >> > stream processing now? >> > >> > >> > Maelstrom still uses the old Kafka Simple Consumer, this library was >> made >> > open source so that I >> > could continue working on it for future updates & improvements like >> when the >> > latest Kafka Consumer >> > gets a stable release. >> > >> > We have been using Maelstrom "caching concept" for a long time now, as >> > Receiver based Spark Kafka integration >> > does not work for us. There were thoughts about using Direct Kafka APIs, >> > however Maelstrom has >> > very simple APIs and just "simply works" even under unstable scenarios >> (e.g. >> > advertised hostname failures on EMR). >> > >> > Maelstrom will work I believe even for Spark 1.3 and Kafka 0.8.2.1 (and >> of >> > course with the latest Kafka 0.10 as well) >> > >> > >> > On Wed, Aug 24, 2016 at 9:49 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >> >> >> Were you aware that the spark 2.0 / kafka 0.10 integration also reuses >> >> kafka consumer instances on the executors? >> >> >> >> On Tue, Aug 23, 2016 at 3:19 PM, Jeoffrey Lim <jeoffr...@gmail.com> >> wrote: >> >> > Hi, >> >> > >> >> > I have released the first version of a new Kafka integration with >> Spark >> >> > that we use in the company I work for: open sourced and named >> Maelstrom. >> >> > >> >> > It is unique compared to other solutions out there as it reuses the >> >> > Kafka Consumer connection to achieve sub-milliseconds latency. >> >> > >> >> > This library has been running stable in production environment and >> has >> >> > been proven to be resilient to numerous production issues. >> >> > >> >> > >> >> > Please check out the project's page in github: >> >> > >> >> > https://github.com/jeoffreylim/maelstrom >> >> > >> >> > >> >> > Contributors welcome! >> >> > >> >> > >> >> > Cheers! >> >> > >> >> > Jeoffrey Lim >> >> > >> >> > >> >> > P.S. I am also looking for a job opportunity, please look me up at >> >> > Linked In >> > >> > >> > >
Re: Maelstrom: Kafka integration with Spark
Hi Cody, thank you for pointing out sub-millisecond processing, it is an "exaggerated" term :D I simply got excited releasing this project, it should be: "millisecond stream processing at the spark level". Highly appreciate the info about latest Kafka consumer. Would need to get up to speed about the most recent improvements and new features of Kafka itself. I think with Spark's latest Kafka Integration 0.10 features, Maelstrom's upside would only be the simple APIs (developer friendly). I'll play around with Spark 2.0 kafka-010 KafkaRDD to see if this is feasible. On Wed, Aug 24, 2016 at 10:46 PM, Cody Koeninger <c...@koeninger.org> wrote: > Yes, spark-streaming-kafka-0-10 uses the new consumer. Besides > pre-fetching messages, the big reason for that is that security > features are only available with the new consumer. > > The Kafka project is at release 0.10.0.1 now, they think most of the > issues with the new consumer have been ironed out. You can track the > progress as to when they'll remove the "beta" label at > https://issues.apache.org/jira/browse/KAFKA-3283 > > As far as I know, Kafka in general can't achieve sub-millisecond > end-to-end stream processing, so my guess is you need to be more > specific about your terms there. > > I promise I'm not trying to start a pissing contest :) just wanted to > check if you were aware of the current state of the other consumers. > Collaboration is always welcome. > > > On Tue, Aug 23, 2016 at 10:18 PM, Jeoffrey Lim <jeoffr...@gmail.com> > wrote: > > Apologies, I was not aware that Spark 2.0 has Kafka Consumer > caching/pooling > > now. > > What I have checked is the latest Kafka Consumer, and I believe it is > still > > in beta quality. > > > > https://kafka.apache.org/documentation.html#newconsumerconfigs > > > >> Since 0.9.0.0 we have been working on a replacement for our existing > >> simple and high-level consumers. > >> The code is considered beta quality. > > > > Not sure about this, does Spark 2.0 Kafka 0.10 integration already uses > this > > one? Is it now stable? > > With this caching feature in Spark 2,.0 could it achieve sub-milliseconds > > stream processing now? > > > > > > Maelstrom still uses the old Kafka Simple Consumer, this library was made > > open source so that I > > could continue working on it for future updates & improvements like when > the > > latest Kafka Consumer > > gets a stable release. > > > > We have been using Maelstrom "caching concept" for a long time now, as > > Receiver based Spark Kafka integration > > does not work for us. There were thoughts about using Direct Kafka APIs, > > however Maelstrom has > > very simple APIs and just "simply works" even under unstable scenarios > (e.g. > > advertised hostname failures on EMR). > > > > Maelstrom will work I believe even for Spark 1.3 and Kafka 0.8.2.1 (and > of > > course with the latest Kafka 0.10 as well) > > > > > > On Wed, Aug 24, 2016 at 9:49 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> Were you aware that the spark 2.0 / kafka 0.10 integration also reuses > >> kafka consumer instances on the executors? > >> > >> On Tue, Aug 23, 2016 at 3:19 PM, Jeoffrey Lim <jeoffr...@gmail.com> > wrote: > >> > Hi, > >> > > >> > I have released the first version of a new Kafka integration with > Spark > >> > that we use in the company I work for: open sourced and named > Maelstrom. > >> > > >> > It is unique compared to other solutions out there as it reuses the > >> > Kafka Consumer connection to achieve sub-milliseconds latency. > >> > > >> > This library has been running stable in production environment and has > >> > been proven to be resilient to numerous production issues. > >> > > >> > > >> > Please check out the project's page in github: > >> > > >> > https://github.com/jeoffreylim/maelstrom > >> > > >> > > >> > Contributors welcome! > >> > > >> > > >> > Cheers! > >> > > >> > Jeoffrey Lim > >> > > >> > > >> > P.S. I am also looking for a job opportunity, please look me up at > >> > Linked In > > > > >
Re: Maelstrom: Kafka integration with Spark
Apologies, I was not aware that Spark 2.0 has Kafka Consumer caching/pooling now. What I have checked is the latest Kafka Consumer, and I believe it is still in beta quality. https://kafka.apache.org/documentation.html#newconsumerconfigs > Since 0.9.0.0 we have been working on a replacement for our existing simple and high-level consumers. > The code is considered beta quality. Not sure about this, does Spark 2.0 Kafka 0.10 integration already uses this one? Is it now stable? With this caching feature in Spark 2,.0 could it achieve sub-milliseconds stream processing now? Maelstrom still uses the old Kafka Simple Consumer, this library was made open source so that I could continue working on it for future updates & improvements like when the latest Kafka Consumer gets a stable release. We have been using Maelstrom "caching concept" for a long time now, as Receiver based Spark Kafka integration does not work for us. There were thoughts about using Direct Kafka APIs, however Maelstrom has very simple APIs and just "simply works" even under unstable scenarios (e.g. advertised hostname failures on EMR). Maelstrom will work I believe even for Spark 1.3 and Kafka 0.8.2.1 (and of course with the latest Kafka 0.10 as well) On Wed, Aug 24, 2016 at 9:49 AM, Cody Koeninger <c...@koeninger.org> wrote: > Were you aware that the spark 2.0 / kafka 0.10 integration also reuses > kafka consumer instances on the executors? > > On Tue, Aug 23, 2016 at 3:19 PM, Jeoffrey Lim <jeoffr...@gmail.com> wrote: > > Hi, > > > > I have released the first version of a new Kafka integration with Spark > > that we use in the company I work for: open sourced and named Maelstrom. > > > > It is unique compared to other solutions out there as it reuses the > > Kafka Consumer connection to achieve sub-milliseconds latency. > > > > This library has been running stable in production environment and has > > been proven to be resilient to numerous production issues. > > > > > > Please check out the project's page in github: > > > > https://github.com/jeoffreylim/maelstrom > > > > > > Contributors welcome! > > > > > > Cheers! > > > > Jeoffrey Lim > > > > > > P.S. I am also looking for a job opportunity, please look me up at > Linked In >
Maelstrom: Kafka integration with Spark
Hi, I have released the first version of a new Kafka integration with Spark that we use in the company I work for: open sourced and named Maelstrom. It is unique compared to other solutions out there as it reuses the Kafka Consumer connection to achieve sub-milliseconds latency. This library has been running stable in production environment and has been proven to be resilient to numerous production issues. Please check out the project's page in github: https://github.com/jeoffreylim/maelstrom Contributors welcome! Cheers! Jeoffrey Lim P.S. I am also looking for a job opportunity, please look me up at Linked In
Re: How to initiate a shutdown of Spark Streaming context?
What we did for gracefully shutting down the spark streaming context is extend a Spark Web UI Tab and perform a SparkContext.SparkUI.attachTab(custom web ui). However, the custom scala Web UI extensions needs to be under the package org.apache.spark.ui to get around with the package access restrictions. Would it be possible that the SparkUI under SparkContext, and Spark Web UI packages exposed as public so that developers may be able to add customizations with their own tools? Thanks! On Tue, Sep 16, 2014 at 12:34 AM, stanley [via Apache Spark User List] ml-node+s1001560n14252...@n3.nabble.com wrote: Thank you. Would the following approaches to address this problem an overkills? a. create a ServerSocket in a different thread from the main thread that created the Spark StreamingContext, and listens to shutdown command there b. create a web service that wraps around the main thread that created the Spark StreamingContext, and responds to shutdown requests Does Spark Streaming already provide similar capabilities? Stanley -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initiate-a-shutdown-of-Spark-Streaming-context-tp14092p14252.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=amVvZmZyZXlsQGdtYWlsLmNvbXwxfDUzNTE3MDc2OQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initiate-a-shutdown-of-Spark-Streaming-context-tp14092p14277.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Some Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..
Our issue could be related to this problem as described in: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-td14027.html which the DStream is processed for every 1 hour batch duration. I have implemented IO throttling in the Receiver as well in our Kafka consumer, and our backlog is not that large. NFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for dropping INFO : org.apache.spark.storage.BlockManager - Dropping block *input-0-1410443074600* from memory INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600 of size 12651900 dropped from memory (free 21220667) INFO : org.apache.spark.storage.BlockManagerInfo - Removed input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory (size: 12.1 MB, free: 100.6 MB) The question that I have now is: how to prevent the MemoryStore/BlockManager of dropping the block inputs? And should they be logged in the level WARN/ERROR? Thanks. On Fri, Sep 12, 2014 at 4:45 PM, Dibyendu Bhattacharya [via Apache Spark User List] ml-node+s1001560n14075...@n3.nabble.com wrote: Dear all, I am sorry. This was a false alarm There was some issue in the RDD processing logic which leads to large backlog. Once I fixed the issues in my processing logic, I can see all messages being pulled nicely without any Block Removed error. I need to tune certain configurations in my Kafka Consumer to modify the data rate and also the batch size. Sorry again. Regards, Dibyendu On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu [hidden email] http://user/SendEmail.jtp?type=nodenode=14075i=0 wrote: This is my case about broadcast variable: 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0* 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436)* 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 7 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106) 14/07/21
Spark Streaming in 1 hour batch duration RDD files gets lost
Hi, Our spark streaming app is configured to pull data from Kafka in 1 hour batch duration which performs aggregation of data by specific keys and store the related RDDs to HDFS in the transform phase. We have tried checkpoint of 7 days on the DStream of Kafka to ensure that the generated stream does not expire/lost. The first hour gets completed, but on the succeeding hours it always fails with exception: Job aborted due to stage failure: Task 39.0:1 failed 64 times, most recent failure: Exception failure in TID 27578 on host X.ec2.internal: java.io.FileNotFoundException: /data/run/spark/work/spark-local-20140911175744-4ddf/0d/shuffle_3_1_311 (No such file or directory) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177) org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158) scala.collection.Iterator$class.foreach(Iterator.scala:727) Environment: CDH version: 2.3.0-cdh5.1.0 Spark version: 1.0.0-cdh5.1.0 Spark settings: spark.io.compression.codec : org.apache.spark.io.SnappyCompressionCodec spark.serializer : org.apache.spark.serializer.KryoSerializer spark.kryoserializer.buffer.mb : 2 spark.local.dir : /data/run/spark/work/ spark.scheduler.mode : FAIR spark.rdd.compress : false spark.task.maxFailures : 64 spark.shuffle.use.netty : false spark.shuffle.spill : true spark.streaming.checkpoint.dir : hdfs://X.ec2.internal:8020/user/spark/checkpoints/event-storage spark.akka.threads : 4 spark.cores.max : 4 spark.executor.memory : 3g spark.shuffle.consolidateFiles : false spark.streaming.unpersist : true spark.logConf : true spark.shuffle.spill.compress : true Thanks, JL -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-tp14027.html Sent from the Apache Spark User List mailing list archive at Nabble.com.