Re: Spark 1.5 Streaming and Kinesis
I am currently trying a few code changes to see if I can squash this error. I have created https://issues.apache.org/jira/browse/SPARK-11193 to track progress, hope that is okay! In the meantime, can anyone confirm their ability to run the Kinesis-ASL example using Spark > 1.5.x ? Would be helpful to know if it works in some cases but not others. http://spark.apache.org/docs/1.5.1/streaming-kinesis-integration.html Thanks Phil On Thu, Oct 15, 2015 at 10:35 PM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Phil, > > sorry I didn't have time to investigate yesterday (I was on a couple of > other Apache projects ;)). I will try to do it today. I keep you posted. > > Regards > JB > > On 10/16/2015 07:21 AM, Phil Kallos wrote: > >> JB, >> >> To clarify, you are able to run the Amazon Kinesis example provided in >> the spark examples dir? >> >> bin/run-example streaming.KinesisWordCountASL [app name] [stream name] >> [endpoint url] ? >> >> If it helps, below are the steps I used to build spark >> >> mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package >> >> And I did this with revision 4f894dd6906311cb57add6757690069a18078783 >> (v.1.5.1) >> >> Thanks, >> Phil >> >> >> On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi <cepoi.eu...@gmail.com >> <mailto:cepoi.eu...@gmail.com>> wrote: >> >> So running it using spark-submit doesnt change anything, it still >> works. >> >> When reading the code >> >> https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100 >> it looks like the receivers are definitely being ser/de. I think >> this is the issue, need to find a way to confirm that now... >> >> 2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eu...@gmail.com >> <mailto:cepoi.eu...@gmail.com>>: >> >> Hey, >> >> A quick update on other things that have been tested. >> >> When looking at the compiled code of the >> spark-streaming-kinesis-asl jar everything looks normal (there >> is a class that implements SyncMap and it is used inside the >> receiver). >> Starting a spark shell and using introspection to instantiate a >> receiver and check that blockIdToSeqNumRanges implements SyncMap >> works too. So obviously it has the correct type according to that. >> >> Another thing to test could be to do the same introspection >> stuff but inside a spark job to make sure it is not a problem in >> the way the jobs are run. >> The other idea would be that this is a problem related to >> ser/de. For example if the receiver was being serialized and >> then deserialized it could definitely happen depending on the >> lib used and its configuration that it just doesn't preserve the >> concrete type. So it would deserialize using the compile type >> instead of the runtime type. >> >> Cheers, >> Eugen >> >> >> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <j...@nanthrax.net >> <mailto:j...@nanthrax.net>>: >> >> Thanks for the update Phil. >> >> I'm preparing a environment to reproduce it. >> >> I keep you posted. >> >> Thanks again, >> Regards >> JB >> >> On 10/15/2015 08:36 AM, Phil Kallos wrote: >> >> Not a dumb question, but yes I updated all of the >> library references to >> 1.5, including (even tried 1.5.1). >> >> // Versions.spark set elsewhere to "1.5.0" >> "org.apache.spark" %% "spark-streaming-kinesis-asl" % >> Versions.spark % >> "provided" >> >> I am experiencing the issue in my own spark project, but >> also when I try >> to run the spark streaming kinesis example that comes in >> spark/examples >> >> Tried running the streaming job locally, and also in EMR >> with release >> 4.1.0 that includes Spark 1.5 >> >> Very strange! >> >> -- Forwarded message -- >> >> From
Re: Spark 1.5 Streaming and Kinesis
Not a dumb question, but yes I updated all of the library references to 1.5, including (even tried 1.5.1). // Versions.spark set elsewhere to "1.5.0" "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark % "provided" I am experiencing the issue in my own spark project, but also when I try to run the spark streaming kinesis example that comes in spark/examples Tried running the streaming job locally, and also in EMR with release 4.1.0 that includes Spark 1.5 Very strange! > -- Forwarded message -- From: "Jean-Baptiste Onofré" <j...@nanthrax.net> > To: user@spark.apache.org > Cc: > Date: Thu, 15 Oct 2015 08:03:55 +0200 > Subject: Re: Spark 1.5 Streaming and Kinesis > Hi Phil, > KinesisReceiver is part of extra. Just a dumb question: did you update > all, including the Spark Kinesis extra containing the KinesisReceiver ? > I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see: > blockIdToSeqNumRanges.clear() > which is a: > private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, > SequenceNumberRanges] > with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges] > So, it doesn't look fully correct to me. > Let me investigate a bit this morning. > Regards > JB > On 10/15/2015 07:49 AM, Phil Kallos wrote: > We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis > streaming applications, to take advantage of the new Kinesis > checkpointing improvements in 1.5. > However after upgrading, we are consistently seeing the following error: > java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be > cast to scala.collection.mutable.SynchronizedMap > at > > org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175) > at > > org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) > at > > org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) > at > > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542) > at > > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532) > at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984) > at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > I even get this when running the Kinesis examples : > http://spark.apache.org/docs/latest/streaming-kinesis-integration.html > with > bin/run-example streaming.KinesisWordCountASL > Am I doing something incorrect? > > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com Hi, >
Re: Spark 1.5 Streaming and Kinesis
JB, To clarify, you are able to run the Amazon Kinesis example provided in the spark examples dir? bin/run-example streaming.KinesisWordCountASL [app name] [stream name] [endpoint url] ? If it helps, below are the steps I used to build spark mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package And I did this with revision 4f894dd6906311cb57add6757690069a18078783 (v.1.5.1) Thanks, Phil On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi <cepoi.eu...@gmail.com> wrote: > So running it using spark-submit doesnt change anything, it still works. > > When reading the code > https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100 > it looks like the receivers are definitely being ser/de. I think this is > the issue, need to find a way to confirm that now... > > 2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eu...@gmail.com>: > >> Hey, >> >> A quick update on other things that have been tested. >> >> When looking at the compiled code of the spark-streaming-kinesis-asl jar >> everything looks normal (there is a class that implements SyncMap and it is >> used inside the receiver). >> Starting a spark shell and using introspection to instantiate a receiver >> and check that blockIdToSeqNumRanges implements SyncMap works too. So >> obviously it has the correct type according to that. >> >> Another thing to test could be to do the same introspection stuff but >> inside a spark job to make sure it is not a problem in the way the jobs are >> run. >> The other idea would be that this is a problem related to ser/de. For >> example if the receiver was being serialized and then deserialized it could >> definitely happen depending on the lib used and its configuration that it >> just doesn't preserve the concrete type. So it would deserialize using the >> compile type instead of the runtime type. >> >> Cheers, >> Eugen >> >> >> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <j...@nanthrax.net>: >> >>> Thanks for the update Phil. >>> >>> I'm preparing a environment to reproduce it. >>> >>> I keep you posted. >>> >>> Thanks again, >>> Regards >>> JB >>> >>> On 10/15/2015 08:36 AM, Phil Kallos wrote: >>> >>>> Not a dumb question, but yes I updated all of the library references to >>>> 1.5, including (even tried 1.5.1). >>>> >>>> // Versions.spark set elsewhere to "1.5.0" >>>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark % >>>> "provided" >>>> >>>> I am experiencing the issue in my own spark project, but also when I try >>>> to run the spark streaming kinesis example that comes in spark/examples >>>> >>>> Tried running the streaming job locally, and also in EMR with release >>>> 4.1.0 that includes Spark 1.5 >>>> >>>> Very strange! >>>> >>>> -- Forwarded message -- >>>> >>>> From: "Jean-Baptiste Onofré" <j...@nanthrax.net >>> j...@nanthrax.net>> >>>> To: user@spark.apache.org <mailto:user@spark.apache.org> >>>> >>>> Cc: >>>> Date: Thu, 15 Oct 2015 08:03:55 +0200 >>>> Subject: Re: Spark 1.5 Streaming and Kinesis >>>> Hi Phil, >>>> KinesisReceiver is part of extra. Just a dumb question: did you >>>> update all, including the Spark Kinesis extra containing the >>>> KinesisReceiver ? >>>> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we >>>> see: >>>> blockIdToSeqNumRanges.clear() >>>> which is a: >>>> private val blockIdToSeqNumRanges = new >>>> mutable.HashMap[StreamBlockId, SequenceNumberRanges] >>>> with mutable.SynchronizedMap[StreamBlockId, >>>> SequenceNumberRanges] >>>> So, it doesn't look fully correct to me. >>>> Let me investigate a bit this morning. >>>> Regards >>>> JB >>>> On 10/15/2015 07:49 AM, Phil Kallos wrote: >>>> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis >>>> streaming applications, to take advantage of the new Kinesis >>>> checkpointing improvements in 1.5. >>>> However after upgrading, we are c
Spark 1.5 Streaming and Kinesis
Hi, We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis streaming applications, to take advantage of the new Kinesis checkpointing improvements in 1.5. However after upgrading, we are consistently seeing the following error: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.SynchronizedMap at org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532) at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984) at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I even get this when running the Kinesis examples : http://spark.apache.org/docs/latest/streaming-kinesis-integration.html with bin/run-example streaming.KinesisWordCountASL Am I doing something incorrect?
Spark Kinesis Checkpointing/Processing Delay
Hi! Sorry if this is a repost. I'm using Spark + Kinesis ASL to process and persist stream data to ElasticSearch. For the most part it works nicely. There is a subtle issue I'm running into about how failures are handled. For example's sake, let's say I am processing a Kinesis stream that produces 400 records per second, continuously. Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream consumer to use TRIM_HORIZON, to mean go as far back as possible and start processing the stream data as quickly as possible, until you catch up to the tip of the stream. This means that for some period of time, Spark will suck in data from Kinesis as quickly as it can, let's say at 5000 records per second. In my specific case, ElasticSearch can gracefully handle 400 writes per second, but is NOT happy to process 5000 writes per second. Let's say it only handles 2000 wps. This means that the processing time will exceed the batch time, scheduling delay in the stream will rise consistently and batches of data will get backlogged for some period of time. In normal circumstances, this is fine. When the Spark consumers catch up to real-time, the data input rate slows to 400rps and the backlogged batches eventually get flushed to ES. The system stabilizes. However! It appears to me that the Kinesis consumer actively submits checkpoints, even though the records may not have been processed yet (since they are backlogged). If for some reason there is processing delay and the Spark process crashes, the checkpoint will have advanced too far. If I resume the Spark Streaming process, there is essentially a gap in my ElasticSearch data. In principle I understand the reason for this, but is there a way to adjust this behavior? Or is there another way to handle this specific problem? Ideally I would be able to configure the process to only submit Kinesis checkpoints only after my data is successfully written to ES. Thanks, Phil