Re: Spark 1.5 Streaming and Kinesis

2015-10-20 Thread Jean-Baptiste Onofré
b locally, and
also in EMR
 with release
 4.1.0 that includes Spark 1.5

 Very strange!

  -- Forwarded message --

  From: "Jean-Baptiste Onofré"
<j...@nanthrax.net <mailto:j...@nanthrax.net>
 <mailto:j...@nanthrax.net
<mailto:j...@nanthrax.net>> <mailto:j...@nanthrax.net
<mailto:j...@nanthrax.net>
 <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>>>>
  To: user@spark.apache.org
<mailto:user@spark.apache.org>
 <mailto:user@spark.apache.org
<mailto:user@spark.apache.org>>
 <mailto:user@spark.apache.org
<mailto:user@spark.apache.org>

 <mailto:user@spark.apache.org
    <mailto:user@spark.apache.org>>>

  Cc:
  Date: Thu, 15 Oct 2015 08:03:55 +0200
  Subject: Re: Spark 1.5 Streaming and Kinesis
  Hi Phil,
  KinesisReceiver is part of extra. Just a dumb
 question: did you
  update all, including the Spark Kinesis extra
 containing the
  KinesisReceiver ?
  I checked on tag v1.5.0, and at line 175
of the
 KinesisReceiver, we see:
  blockIdToSeqNumRanges.clear()
  which is a:
  private val blockIdToSeqNumRanges = new
  mutable.HashMap[StreamBlockId,
SequenceNumberRanges]
   with
mutable.SynchronizedMap[StreamBlockId,
 SequenceNumberRanges]
  So, it doesn't look fully correct to me.
  Let me investigate a bit this morning.
  Regards
  JB
  On 10/15/2015 07:49 AM, Phil Kallos wrote:
  We are trying to migrate from Spark1.4 to
Spark1.5
 for our Kinesis
  streaming applications, to take advantage
of the
 new Kinesis
  checkpointing improvements in 1.5.
  However after upgrading, we are
consistently seeing
 the following error:
  java.lang.ClassCastException:
 scala.collection.mutable.HashMap cannot be
  cast to
scala.collection.mutable.SynchronizedMap
  at



org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
  at



org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
  at



org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
  at



org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
  at



org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
  at


org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
  at


org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
  at

org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at
org.apache.spark.scheduler.Task.run(Task.scala:88)
  at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at



java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at



java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
  I even get this when running the Kinesis
examples :
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
 with
  bin/run-example streaming.KinesisWordCountASL
  Am I doing something incorrect?


  

Re: Spark 1.5 Streaming and Kinesis

2015-10-20 Thread Jean-Baptiste Onofré
 Tried running the streaming job locally, and
also in EMR
 with release
 4.1.0 that includes Spark 1.5

 Very strange!

  -- Forwarded message --

  From: "Jean-Baptiste Onofré"
<j...@nanthrax.net <mailto:j...@nanthrax.net>
 <mailto:j...@nanthrax.net
<mailto:j...@nanthrax.net>> <mailto:j...@nanthrax.net
<mailto:j...@nanthrax.net>
 <mailto:j...@nanthrax.net <mailto:j...@nanthrax.net>>>>
  To: user@spark.apache.org
<mailto:user@spark.apache.org>
 <mailto:user@spark.apache.org
<mailto:user@spark.apache.org>>
 <mailto:user@spark.apache.org
<mailto:user@spark.apache.org>

 <mailto:user@spark.apache.org
    <mailto:user@spark.apache.org>>>

  Cc:
  Date: Thu, 15 Oct 2015 08:03:55 +0200
  Subject: Re: Spark 1.5 Streaming and Kinesis
  Hi Phil,
  KinesisReceiver is part of extra. Just a dumb
 question: did you
  update all, including the Spark Kinesis extra
 containing the
  KinesisReceiver ?
  I checked on tag v1.5.0, and at line 175
of the
 KinesisReceiver, we see:
  blockIdToSeqNumRanges.clear()
  which is a:
  private val blockIdToSeqNumRanges = new
  mutable.HashMap[StreamBlockId,
SequenceNumberRanges]
   with
mutable.SynchronizedMap[StreamBlockId,
 SequenceNumberRanges]
  So, it doesn't look fully correct to me.
  Let me investigate a bit this morning.
  Regards
  JB
  On 10/15/2015 07:49 AM, Phil Kallos wrote:
  We are trying to migrate from Spark1.4 to
Spark1.5
 for our Kinesis
  streaming applications, to take advantage
of the
 new Kinesis
  checkpointing improvements in 1.5.
  However after upgrading, we are
consistently seeing
 the following error:
  java.lang.ClassCastException:
 scala.collection.mutable.HashMap cannot be
  cast to
scala.collection.mutable.SynchronizedMap
  at



org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
  at



org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
  at



org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
  at



org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
  at



org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
  at


org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
  at


org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
  at

org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at
org.apache.spark.scheduler.Task.run(Task.scala:88)
  at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at



java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at



java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
  I even get this when running the Kinesis
examples :
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
 with
  bin/run-example streaming.KinesisWordCountASL
  Am I doing 

Re: Spark 1.5 Streaming and Kinesis

2015-10-20 Thread Jean-Baptiste Onofré
  I am experiencing the issue in my own spark
project, but
 also when I try
 to run the spark streaming kinesis example that
comes in
 spark/examples

 Tried running the streaming job locally, and
also in EMR
 with release
 4.1.0 that includes Spark 1.5

 Very strange!

  -- Forwarded message --

  From: "Jean-Baptiste Onofré"
<j...@nanthrax.net <mailto:j...@nanthrax.net>
 <mailto:j...@nanthrax.net
<mailto:j...@nanthrax.net>> <mailto:j...@nanthrax.net
<mailto:j...@nanthrax.net>
 <mailto:j...@nanthrax.net
<mailto:j...@nanthrax.net>>>>
  To: user@spark.apache.org
<mailto:user@spark.apache.org>
 <mailto:user@spark.apache.org
<mailto:user@spark.apache.org>>
 <mailto:user@spark.apache.org
<mailto:user@spark.apache.org>

 <mailto:user@spark.apache.org
    <mailto:user@spark.apache.org>>>

  Cc:
  Date: Thu, 15 Oct 2015 08:03:55 +0200
  Subject: Re: Spark 1.5 Streaming and
Kinesis
  Hi Phil,
  KinesisReceiver is part of extra. Just a
dumb
 question: did you
  update all, including the Spark Kinesis
extra
 containing the
  KinesisReceiver ?
  I checked on tag v1.5.0, and at line 175
of the
 KinesisReceiver, we see:
  blockIdToSeqNumRanges.clear()
  which is a:
  private val blockIdToSeqNumRanges = new
  mutable.HashMap[StreamBlockId,
SequenceNumberRanges]
   with
mutable.SynchronizedMap[StreamBlockId,
 SequenceNumberRanges]
  So, it doesn't look fully correct to me.
  Let me investigate a bit this morning.
  Regards
  JB
  On 10/15/2015 07:49 AM, Phil Kallos wrote:
  We are trying to migrate from Spark1.4 to
Spark1.5
 for our Kinesis
  streaming applications, to take advantage
of the
 new Kinesis
  checkpointing improvements in 1.5.
  However after upgrading, we are
consistently seeing
 the following error:
  java.lang.ClassCastException:
 scala.collection.mutable.HashMap cannot be
  cast to
scala.collection.mutable.SynchronizedMap
  at



org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)

  at



org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)

  at



org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)

  at



org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)

  at



org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)

  at



org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
  at



org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
  at


org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at
org.apache.spark.scheduler.Task.run(Task.scala:88)
  at


org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at



java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  at



java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:745)
  I even get this when running the Kinesis
examples :

http://spa

Re: Spark 1.5 Streaming and Kinesis

2015-10-19 Thread Phil Kallos
I am currently trying a few code changes to see if I can squash this error.
I have created https://issues.apache.org/jira/browse/SPARK-11193 to track
progress, hope that is okay!

In the meantime, can anyone confirm their ability to run the Kinesis-ASL
example using Spark > 1.5.x ? Would be helpful to know if it works in some
cases but not others.
http://spark.apache.org/docs/1.5.1/streaming-kinesis-integration.html

Thanks
Phil

On Thu, Oct 15, 2015 at 10:35 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Phil,
>
> sorry I didn't have time to investigate yesterday (I was on a couple of
> other Apache projects ;)). I will try to do it today. I keep you posted.
>
> Regards
> JB
>
> On 10/16/2015 07:21 AM, Phil Kallos wrote:
>
>> JB,
>>
>> To clarify, you are able to run the Amazon Kinesis example provided in
>> the spark examples dir?
>>
>> bin/run-example streaming.KinesisWordCountASL [app name] [stream name]
>> [endpoint url] ?
>>
>> If it helps, below are the steps I used to build spark
>>
>> mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package
>>
>> And I did this with revision 4f894dd6906311cb57add6757690069a18078783
>> (v.1.5.1)
>>
>> Thanks,
>> Phil
>>
>>
>> On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi <cepoi.eu...@gmail.com
>> <mailto:cepoi.eu...@gmail.com>> wrote:
>>
>> So running it using spark-submit doesnt change anything, it still
>> works.
>>
>> When reading the code
>>
>> https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
>> it looks like the receivers are definitely being ser/de. I think
>> this is the issue, need to find a way to confirm that now...
>>
>> 2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eu...@gmail.com
>> <mailto:cepoi.eu...@gmail.com>>:
>>
>> Hey,
>>
>> A quick update on other things that have been tested.
>>
>> When looking at the compiled code of the
>> spark-streaming-kinesis-asl jar everything looks normal (there
>> is a class that implements SyncMap and it is used inside the
>> receiver).
>> Starting a spark shell and using introspection to instantiate a
>> receiver and check that blockIdToSeqNumRanges implements SyncMap
>> works too. So obviously it has the correct type according to that.
>>
>> Another thing to test could be to do the same introspection
>> stuff but inside a spark job to make sure it is not a problem in
>> the way the jobs are run.
>> The other idea would be that this is a problem related to
>> ser/de. For example if the receiver was being serialized and
>> then deserialized it could definitely happen depending on the
>> lib used and its configuration that it just doesn't preserve the
>> concrete type. So it would deserialize using the compile type
>> instead of the runtime type.
>>
>> Cheers,
>> Eugen
>>
>>
>> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <j...@nanthrax.net
>> <mailto:j...@nanthrax.net>>:
>>
>> Thanks for the update Phil.
>>
>> I'm preparing a environment to reproduce it.
>>
>> I keep you posted.
>>
>> Thanks again,
>> Regards
>> JB
>>
>> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>>
>> Not a dumb question, but yes I updated all of the
>> library references to
>> 1.5, including  (even tried 1.5.1).
>>
>> // Versions.spark set elsewhere to "1.5.0"
>> "org.apache.spark" %% "spark-streaming-kinesis-asl" %
>> Versions.spark %
>> "provided"
>>
>> I am experiencing the issue in my own spark project, but
>> also when I try
>> to run the spark streaming kinesis example that comes in
>> spark/examples
>>
>> Tried running the streaming job locally, and also in EMR
>> with release
>>         4.1.0 that includes Spark 1.5
>>
>> Very strange!
>>
>>  -- Forwarded message --
>>
>>  From

Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Eugen Cepoi
Hey,

A quick update on other things that have been tested.

When looking at the compiled code of the spark-streaming-kinesis-asl jar
everything looks normal (there is a class that implements SyncMap and it is
used inside the receiver).
Starting a spark shell and using introspection to instantiate a receiver
and check that blockIdToSeqNumRanges implements SyncMap works too. So
obviously it has the correct type according to that.

Another thing to test could be to do the same introspection stuff but
inside a spark job to make sure it is not a problem in the way the jobs are
run.
The other idea would be that this is a problem related to ser/de. For
example if the receiver was being serialized and then deserialized it could
definitely happen depending on the lib used and its configuration that it
just doesn't preserve the concrete type. So it would deserialize using the
compile type instead of the runtime type.

Cheers,
Eugen


2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <j...@nanthrax.net>:

> Thanks for the update Phil.
>
> I'm preparing a environment to reproduce it.
>
> I keep you posted.
>
> Thanks again,
> Regards
> JB
>
> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>
>> Not a dumb question, but yes I updated all of the library references to
>> 1.5, including  (even tried 1.5.1).
>>
>> // Versions.spark set elsewhere to "1.5.0"
>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
>> "provided"
>>
>> I am experiencing the issue in my own spark project, but also when I try
>> to run the spark streaming kinesis example that comes in spark/examples
>>
>> Tried running the streaming job locally, and also in EMR with release
>> 4.1.0 that includes Spark 1.5
>>
>> Very strange!
>>
>> -- Forwarded message --
>>
>> From: "Jean-Baptiste Onofré" <j...@nanthrax.net <mailto:j...@nanthrax.net
>> >>
>> To: user@spark.apache.org <mailto:user@spark.apache.org>
>>
>> Cc:
>> Date: Thu, 15 Oct 2015 08:03:55 +0200
>> Subject: Re: Spark 1.5 Streaming and Kinesis
>> Hi Phil,
>> KinesisReceiver is part of extra. Just a dumb question: did you
>> update all, including the Spark Kinesis extra containing the
>> KinesisReceiver ?
>> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we
>> see:
>> blockIdToSeqNumRanges.clear()
>> which is a:
>> private val blockIdToSeqNumRanges = new
>> mutable.HashMap[StreamBlockId, SequenceNumberRanges]
>>  with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
>> So, it doesn't look fully correct to me.
>> Let me investigate a bit this morning.
>> Regards
>> JB
>> On 10/15/2015 07:49 AM, Phil Kallos wrote:
>> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>> streaming applications, to take advantage of the new Kinesis
>> checkpointing improvements in 1.5.
>> However after upgrading, we are consistently seeing the following
>> error:
>> java.lang.ClassCastException: scala.collection.mutable.HashMap cannot
>> be
>> cast to scala.collection.mutable.SynchronizedMap
>> at
>>
>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>> at
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>> at
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>> at
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>> at
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
>> at
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>> at
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745

Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Phil Kallos
Not a dumb question, but yes I updated all of the library references to
1.5, including  (even tried 1.5.1).

// Versions.spark set elsewhere to "1.5.0"
"org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
"provided"

I am experiencing the issue in my own spark project, but also when I try to
run the spark streaming kinesis example that comes in spark/examples

Tried running the streaming job locally, and also in EMR with release 4.1.0
that includes Spark 1.5

Very strange!


> -- Forwarded message --

From: "Jean-Baptiste Onofré" <j...@nanthrax.net>
> To: user@spark.apache.org
> Cc:
> Date: Thu, 15 Oct 2015 08:03:55 +0200
> Subject: Re: Spark 1.5 Streaming and Kinesis
> Hi Phil,
> KinesisReceiver is part of extra. Just a dumb question: did you update
> all, including the Spark Kinesis extra containing the KinesisReceiver ?
> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:
> blockIdToSeqNumRanges.clear()
> which is a:
> private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId,
> SequenceNumberRanges]
> with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
> So, it doesn't look fully correct to me.
> Let me investigate a bit this morning.
> Regards
> JB
> On 10/15/2015 07:49 AM, Phil Kallos wrote:
> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
> streaming applications, to take advantage of the new Kinesis
> checkpointing improvements in 1.5.
> However after upgrading, we are consistently seeing the following error:
> java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
> cast to scala.collection.mutable.SynchronizedMap
> at
>
> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
> at
>
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
> at
>
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
> at
>
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
> at
>
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
> at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
> at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> I even get this when running the Kinesis examples :
> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>  with
> bin/run-example streaming.KinesisWordCountASL
> Am I doing something incorrect?
>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Hi,
>


Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Jean-Baptiste Onofré

Thanks for the update Phil.

I'm preparing a environment to reproduce it.

I keep you posted.

Thanks again,
Regards
JB

On 10/15/2015 08:36 AM, Phil Kallos wrote:

Not a dumb question, but yes I updated all of the library references to
1.5, including  (even tried 1.5.1).

// Versions.spark set elsewhere to "1.5.0"
"org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
"provided"

I am experiencing the issue in my own spark project, but also when I try
to run the spark streaming kinesis example that comes in spark/examples

Tried running the streaming job locally, and also in EMR with release
4.1.0 that includes Spark 1.5

Very strange!

-- Forwarded message --

From: "Jean-Baptiste Onofré" <j...@nanthrax.net <mailto:j...@nanthrax.net>>
To: user@spark.apache.org <mailto:user@spark.apache.org>
Cc:
    Date: Thu, 15 Oct 2015 08:03:55 +0200
Subject: Re: Spark 1.5 Streaming and Kinesis
Hi Phil,
KinesisReceiver is part of extra. Just a dumb question: did you
update all, including the Spark Kinesis extra containing the
KinesisReceiver ?
I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:
blockIdToSeqNumRanges.clear()
which is a:
private val blockIdToSeqNumRanges = new
mutable.HashMap[StreamBlockId, SequenceNumberRanges]
 with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
So, it doesn't look fully correct to me.
Let me investigate a bit this morning.
Regards
JB
On 10/15/2015 07:49 AM, Phil Kallos wrote:
We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
streaming applications, to take advantage of the new Kinesis
checkpointing improvements in 1.5.
However after upgrading, we are consistently seeing the following error:
java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
cast to scala.collection.mutable.SynchronizedMap
at

org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
at

org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at

org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at

org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
at

org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
at
org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at
org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I even get this when running the Kinesis examples :
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html with
bin/run-example streaming.KinesisWordCountASL
Am I doing something incorrect?


--
Jean-Baptiste Onofré
jbono...@apache.org <mailto:jbono...@apache.org>
http://blog.nanthrax.net <http://blog.nanthrax.net/>
Talend - http://www.talend.com <http://www.talend.com/>

Hi,



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Eugen Cepoi
So running it using spark-submit doesnt change anything, it still works.

When reading the code
https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
it looks like the receivers are definitely being ser/de. I think this is
the issue, need to find a way to confirm that now...

2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eu...@gmail.com>:

> Hey,
>
> A quick update on other things that have been tested.
>
> When looking at the compiled code of the spark-streaming-kinesis-asl jar
> everything looks normal (there is a class that implements SyncMap and it is
> used inside the receiver).
> Starting a spark shell and using introspection to instantiate a receiver
> and check that blockIdToSeqNumRanges implements SyncMap works too. So
> obviously it has the correct type according to that.
>
> Another thing to test could be to do the same introspection stuff but
> inside a spark job to make sure it is not a problem in the way the jobs are
> run.
> The other idea would be that this is a problem related to ser/de. For
> example if the receiver was being serialized and then deserialized it could
> definitely happen depending on the lib used and its configuration that it
> just doesn't preserve the concrete type. So it would deserialize using the
> compile type instead of the runtime type.
>
> Cheers,
> Eugen
>
>
> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>
>> Thanks for the update Phil.
>>
>> I'm preparing a environment to reproduce it.
>>
>> I keep you posted.
>>
>> Thanks again,
>> Regards
>> JB
>>
>> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>>
>>> Not a dumb question, but yes I updated all of the library references to
>>> 1.5, including  (even tried 1.5.1).
>>>
>>> // Versions.spark set elsewhere to "1.5.0"
>>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
>>> "provided"
>>>
>>> I am experiencing the issue in my own spark project, but also when I try
>>> to run the spark streaming kinesis example that comes in spark/examples
>>>
>>> Tried running the streaming job locally, and also in EMR with release
>>> 4.1.0 that includes Spark 1.5
>>>
>>> Very strange!
>>>
>>> -- Forwarded message --
>>>
>>> From: "Jean-Baptiste Onofré" <j...@nanthrax.net >> j...@nanthrax.net>>
>>> To: user@spark.apache.org <mailto:user@spark.apache.org>
>>>
>>> Cc:
>>> Date: Thu, 15 Oct 2015 08:03:55 +0200
>>> Subject: Re: Spark 1.5 Streaming and Kinesis
>>> Hi Phil,
>>> KinesisReceiver is part of extra. Just a dumb question: did you
>>> update all, including the Spark Kinesis extra containing the
>>> KinesisReceiver ?
>>> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we
>>> see:
>>> blockIdToSeqNumRanges.clear()
>>> which is a:
>>> private val blockIdToSeqNumRanges = new
>>> mutable.HashMap[StreamBlockId, SequenceNumberRanges]
>>>  with mutable.SynchronizedMap[StreamBlockId,
>>> SequenceNumberRanges]
>>> So, it doesn't look fully correct to me.
>>> Let me investigate a bit this morning.
>>> Regards
>>> JB
>>> On 10/15/2015 07:49 AM, Phil Kallos wrote:
>>> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>>> streaming applications, to take advantage of the new Kinesis
>>> checkpointing improvements in 1.5.
>>> However after upgrading, we are consistently seeing the following
>>> error:
>>> java.lang.ClassCastException: scala.collection.mutable.HashMap
>>> cannot be
>>> cast to scala.collection.mutable.SynchronizedMap
>>> at
>>>
>>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>>> at
>>>
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>>> at
>>>
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>>> at
>>>
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>>> at
>>>
>>> org.apache.spark.streaming.scheduler.ReceiverTrac

Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Jean-Baptiste Onofré

Hi Phil,

sorry I didn't have time to investigate yesterday (I was on a couple of 
other Apache projects ;)). I will try to do it today. I keep you posted.


Regards
JB

On 10/16/2015 07:21 AM, Phil Kallos wrote:

JB,

To clarify, you are able to run the Amazon Kinesis example provided in
the spark examples dir?

bin/run-example streaming.KinesisWordCountASL [app name] [stream name]
[endpoint url] ?

If it helps, below are the steps I used to build spark

mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package

And I did this with revision 4f894dd6906311cb57add6757690069a18078783
(v.1.5.1)

Thanks,
Phil


On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi <cepoi.eu...@gmail.com
<mailto:cepoi.eu...@gmail.com>> wrote:

So running it using spark-submit doesnt change anything, it still works.

When reading the code

https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
it looks like the receivers are definitely being ser/de. I think
this is the issue, need to find a way to confirm that now...

2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eu...@gmail.com
<mailto:cepoi.eu...@gmail.com>>:

Hey,

A quick update on other things that have been tested.

When looking at the compiled code of the
spark-streaming-kinesis-asl jar everything looks normal (there
is a class that implements SyncMap and it is used inside the
receiver).
Starting a spark shell and using introspection to instantiate a
receiver and check that blockIdToSeqNumRanges implements SyncMap
works too. So obviously it has the correct type according to that.

Another thing to test could be to do the same introspection
stuff but inside a spark job to make sure it is not a problem in
the way the jobs are run.
The other idea would be that this is a problem related to
ser/de. For example if the receiver was being serialized and
then deserialized it could definitely happen depending on the
lib used and its configuration that it just doesn't preserve the
concrete type. So it would deserialize using the compile type
instead of the runtime type.

Cheers,
Eugen


2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <j...@nanthrax.net
<mailto:j...@nanthrax.net>>:

Thanks for the update Phil.

I'm preparing a environment to reproduce it.

I keep you posted.

Thanks again,
Regards
JB

On 10/15/2015 08:36 AM, Phil Kallos wrote:

Not a dumb question, but yes I updated all of the
library references to
1.5, including  (even tried 1.5.1).

// Versions.spark set elsewhere to "1.5.0"
"org.apache.spark" %% "spark-streaming-kinesis-asl" %
Versions.spark %
"provided"

I am experiencing the issue in my own spark project, but
also when I try
to run the spark streaming kinesis example that comes in
spark/examples

Tried running the streaming job locally, and also in EMR
with release
4.1.0 that includes Spark 1.5

Very strange!

 -- Forwarded message --

 From: "Jean-Baptiste Onofré" <j...@nanthrax.net
<mailto:j...@nanthrax.net> <mailto:j...@nanthrax.net
<mailto:j...@nanthrax.net>>>
 To: user@spark.apache.org
<mailto:user@spark.apache.org>
<mailto:user@spark.apache.org
<mailto:user@spark.apache.org>>

     Cc:
 Date: Thu, 15 Oct 2015 08:03:55 +0200
 Subject: Re: Spark 1.5 Streaming and Kinesis
 Hi Phil,
 KinesisReceiver is part of extra. Just a dumb
question: did you
 update all, including the Spark Kinesis extra
containing the
 KinesisReceiver ?
 I checked on tag v1.5.0, and at line 175 of the
KinesisReceiver, we see:
 blockIdToSeqNumRanges.clear()
 which is a:
 private val blockIdToSeqNumRanges = new
 mutable.HashMap[StreamBlockId, SequenceNumberRanges]
  with mutable.SynchronizedMap[StreamBlockId,
SequenceNumberRanges]
 So, it doesn't look fully correct to me.
 Let me investigate a bit this morning.
 Regards
  

Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Phil Kallos
JB,

To clarify, you are able to run the Amazon Kinesis example provided in the
spark examples dir?

bin/run-example streaming.KinesisWordCountASL [app name] [stream name]
[endpoint url] ?

If it helps, below are the steps I used to build spark

mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package

And I did this with revision 4f894dd6906311cb57add6757690069a18078783
(v.1.5.1)

Thanks,
Phil


On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi <cepoi.eu...@gmail.com> wrote:

> So running it using spark-submit doesnt change anything, it still works.
>
> When reading the code
> https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
> it looks like the receivers are definitely being ser/de. I think this is
> the issue, need to find a way to confirm that now...
>
> 2015-10-15 16:12 GMT+07:00 Eugen Cepoi <cepoi.eu...@gmail.com>:
>
>> Hey,
>>
>> A quick update on other things that have been tested.
>>
>> When looking at the compiled code of the spark-streaming-kinesis-asl jar
>> everything looks normal (there is a class that implements SyncMap and it is
>> used inside the receiver).
>> Starting a spark shell and using introspection to instantiate a receiver
>> and check that blockIdToSeqNumRanges implements SyncMap works too. So
>> obviously it has the correct type according to that.
>>
>> Another thing to test could be to do the same introspection stuff but
>> inside a spark job to make sure it is not a problem in the way the jobs are
>> run.
>> The other idea would be that this is a problem related to ser/de. For
>> example if the receiver was being serialized and then deserialized it could
>> definitely happen depending on the lib used and its configuration that it
>> just doesn't preserve the concrete type. So it would deserialize using the
>> compile type instead of the runtime type.
>>
>> Cheers,
>> Eugen
>>
>>
>> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>>
>>> Thanks for the update Phil.
>>>
>>> I'm preparing a environment to reproduce it.
>>>
>>> I keep you posted.
>>>
>>> Thanks again,
>>> Regards
>>> JB
>>>
>>> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>>>
>>>> Not a dumb question, but yes I updated all of the library references to
>>>> 1.5, including  (even tried 1.5.1).
>>>>
>>>> // Versions.spark set elsewhere to "1.5.0"
>>>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
>>>> "provided"
>>>>
>>>> I am experiencing the issue in my own spark project, but also when I try
>>>> to run the spark streaming kinesis example that comes in spark/examples
>>>>
>>>> Tried running the streaming job locally, and also in EMR with release
>>>> 4.1.0 that includes Spark 1.5
>>>>
>>>> Very strange!
>>>>
>>>> -- Forwarded message --
>>>>
>>>> From: "Jean-Baptiste Onofré" <j...@nanthrax.net >>> j...@nanthrax.net>>
>>>> To: user@spark.apache.org <mailto:user@spark.apache.org>
>>>>
>>>> Cc:
>>>> Date: Thu, 15 Oct 2015 08:03:55 +0200
>>>> Subject: Re: Spark 1.5 Streaming and Kinesis
>>>> Hi Phil,
>>>> KinesisReceiver is part of extra. Just a dumb question: did you
>>>> update all, including the Spark Kinesis extra containing the
>>>> KinesisReceiver ?
>>>> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we
>>>> see:
>>>> blockIdToSeqNumRanges.clear()
>>>> which is a:
>>>> private val blockIdToSeqNumRanges = new
>>>> mutable.HashMap[StreamBlockId, SequenceNumberRanges]
>>>>  with mutable.SynchronizedMap[StreamBlockId,
>>>> SequenceNumberRanges]
>>>> So, it doesn't look fully correct to me.
>>>> Let me investigate a bit this morning.
>>>> Regards
>>>> JB
>>>> On 10/15/2015 07:49 AM, Phil Kallos wrote:
>>>> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>>>> streaming applications, to take advantage of the new Kinesis
>>>> checkpointing improvements in 1.5.
>>>> However after upgrading, we are c

Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Jean-Baptiste Onofré

Hi Phil,

KinesisReceiver is part of extra. Just a dumb question: did you update 
all, including the Spark Kinesis extra containing the KinesisReceiver ?


I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:

blockIdToSeqNumRanges.clear()

which is a:

private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, 
SequenceNumberRanges]

with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]

So, it doesn't look fully correct to me.

Let me investigate a bit this morning.

Regards
JB

On 10/15/2015 07:49 AM, Phil Kallos wrote:

Hi,

We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
streaming applications, to take advantage of the new Kinesis
checkpointing improvements in 1.5.

However after upgrading, we are consistently seeing the following error:

java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
cast to scala.collection.mutable.SynchronizedMap
at
org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I even get this when running the Kinesis examples :
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html with

bin/run-example streaming.KinesisWordCountASL

Am I doing something incorrect?




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Jean-Baptiste Onofré
By correct, I mean: the map declaration looks good to me, so the 
ClassCastException is weird ;)


I'm trying to reproduce the issue in order to investigate.

Regards
JB

On 10/15/2015 08:03 AM, Jean-Baptiste Onofré wrote:

Hi Phil,

KinesisReceiver is part of extra. Just a dumb question: did you update
all, including the Spark Kinesis extra containing the KinesisReceiver ?

I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:

blockIdToSeqNumRanges.clear()

which is a:

private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId,
SequenceNumberRanges]
 with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]

So, it doesn't look fully correct to me.

Let me investigate a bit this morning.

Regards
JB

On 10/15/2015 07:49 AM, Phil Kallos wrote:

Hi,

We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
streaming applications, to take advantage of the new Kinesis
checkpointing improvements in 1.5.

However after upgrading, we are consistently seeing the following error:

java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
cast to scala.collection.mutable.SynchronizedMap
at
org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)

at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)

at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)

at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)

at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)

at
org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at
org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

I even get this when running the Kinesis examples :
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
with

bin/run-example streaming.KinesisWordCountASL

Am I doing something incorrect?






--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org