I am currently trying a few code changes to see if I can squash this error. I have created https://issues.apache.org/jira/browse/SPARK-11193 to track progress, hope that is okay!
In the meantime, can anyone confirm their ability to run the Kinesis-ASL example using Spark > 1.5.x ? Would be helpful to know if it works in some cases but not others. http://spark.apache.org/docs/1.5.1/streaming-kinesis-integration.html Thanks Phil On Thu, Oct 15, 2015 at 10:35 PM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Phil, > > sorry I didn't have time to investigate yesterday (I was on a couple of > other Apache projects ;)). I will try to do it today. I keep you posted. > > Regards > JB > > On 10/16/2015 07:21 AM, Phil Kallos wrote: > >> JB, >> >> To clarify, you are able to run the Amazon Kinesis example provided in >> the spark examples dir? >> >> bin/run-example streaming.KinesisWordCountASL [app name] [stream name] >> [endpoint url] ? >> >> If it helps, below are the steps I used to build spark >> >> mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package >> >> And I did this with revision 4f894dd6906311cb57add6757690069a18078783 >> (v.1.5.1) >> >> Thanks, >> Phil >> >> >> On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi <cepoi.eu...@gmail.com >> <mailto:cepoi.eu...@gmail.com>> wrote: >> >> So running it using spark-submit doesnt change anything, it still >> works. >> >> When reading the code >> >> https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100 >> it looks like the receivers are definitely being ser/de. I think >> this is the issue, need to find a way to confirm that now... >> >> 2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eu...@gmail.com >> <mailto:cepoi.eu...@gmail.com>>: >> >> Hey, >> >> A quick update on other things that have been tested. >> >> When looking at the compiled code of the >> spark-streaming-kinesis-asl jar everything looks normal (there >> is a class that implements SyncMap and it is used inside the >> receiver). >> Starting a spark shell and using introspection to instantiate a >> receiver and check that blockIdToSeqNumRanges implements SyncMap >> works too. So obviously it has the correct type according to that. >> >> Another thing to test could be to do the same introspection >> stuff but inside a spark job to make sure it is not a problem in >> the way the jobs are run. >> The other idea would be that this is a problem related to >> ser/de. For example if the receiver was being serialized and >> then deserialized it could definitely happen depending on the >> lib used and its configuration that it just doesn't preserve the >> concrete type. So it would deserialize using the compile type >> instead of the runtime type. >> >> Cheers, >> Eugen >> >> >> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <j...@nanthrax.net >> <mailto:j...@nanthrax.net>>: >> >> Thanks for the update Phil. >> >> I'm preparing a environment to reproduce it. >> >> I keep you posted. >> >> Thanks again, >> Regards >> JB >> >> On 10/15/2015 08:36 AM, Phil Kallos wrote: >> >> Not a dumb question, but yes I updated all of the >> library references to >> 1.5, including (even tried 1.5.1). >> >> // Versions.spark set elsewhere to "1.5.0" >> "org.apache.spark" %% "spark-streaming-kinesis-asl" % >> Versions.spark % >> "provided" >> >> I am experiencing the issue in my own spark project, but >> also when I try >> to run the spark streaming kinesis example that comes in >> spark/examples >> >> Tried running the streaming job locally, and also in EMR >> with release >> 4.1.0 that includes Spark 1.5 >> >> Very strange! >> >> ---------- Forwarded message ---------- >> >> From: "Jean-Baptiste Onofré" <j...@nanthrax.net >> <mailto:j...@nanthrax.net> <mailto:j...@nanthrax.net >> <mailto:j...@nanthrax.net>>> >> To: user@spark.apache.org >> <mailto:user@spark.apache.org> >> <mailto:user@spark.apache.org >> >> <mailto:user@spark.apache.org>> >> >> Cc: >> Date: Thu, 15 Oct 2015 08:03:55 +0200 >> Subject: Re: Spark 1.5 Streaming and Kinesis >> Hi Phil, >> KinesisReceiver is part of extra. Just a dumb >> question: did you >> update all, including the Spark Kinesis extra >> containing the >> KinesisReceiver ? >> I checked on tag v1.5.0, and at line 175 of the >> KinesisReceiver, we see: >> blockIdToSeqNumRanges.clear() >> which is a: >> private val blockIdToSeqNumRanges = new >> mutable.HashMap[StreamBlockId, SequenceNumberRanges] >> with mutable.SynchronizedMap[StreamBlockId, >> SequenceNumberRanges] >> So, it doesn't look fully correct to me. >> Let me investigate a bit this morning. >> Regards >> JB >> On 10/15/2015 07:49 AM, Phil Kallos wrote: >> We are trying to migrate from Spark1.4 to Spark1.5 >> for our Kinesis >> streaming applications, to take advantage of the >> new Kinesis >> checkpointing improvements in 1.5. >> However after upgrading, we are consistently seeing >> the following error: >> java.lang.ClassCastException: >> scala.collection.mutable.HashMap cannot be >> cast to scala.collection.mutable.SynchronizedMap >> at >> >> >> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175) >> at >> >> >> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) >> at >> >> >> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) >> at >> >> >> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542) >> at >> >> >> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532) >> at >> >> >> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984) >> at >> >> >> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984) >> at >> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> I even get this when running the Kinesis examples : >> >> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html >> with >> bin/run-example streaming.KinesisWordCountASL >> Am I doing something incorrect? >> >> >> -- >> Jean-Baptiste Onofré >> jbono...@apache.org <mailto:jbono...@apache.org> >> <mailto:jbono...@apache.org <mailto:jbono...@apache.org>> >> http://blog.nanthrax.net <http://blog.nanthrax.net/> >> Talend - http://www.talend.com < >> http://www.talend.com/> >> >> Hi, >> >> >> -- >> Jean-Baptiste Onofré >> jbono...@apache.org <mailto:jbono...@apache.org> >> http://blog.nanthrax.net >> Talend - http://www.talend.com >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> >> >> >> >> > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >