why groupByKey still shuffle if SQL does "Distribute By" on same columns ?

2018-01-30 Thread Dibyendu Bhattacharya
 Hi,

I am trying something like this..

val sesDS:  Dataset[XXX] = hiveContext.sql(select).as[XXX]

The select statement is something like this : "select * from sometable 
DISTRIBUTE by col1, col2, col3"

Then comes groupByKey...

val gpbyDS = sesDS .groupByKey(x => (x.col1, x.col2, x.col3))

As my select is already Distribute the data based on columns which are same
as what I used in groupByKey, Why does groupByKey  still doing the shuffle
? Is this an issue or I am missing something ?

Regards,
Dibyendu


Re: question on Write Ahead Log (Spark Streaming )

2017-03-10 Thread Dibyendu Bhattacharya
Hi,

You could also use this Receiver :
https://github.com/dibbhatt/kafka-spark-consumer

This is part of spark-packages also :
https://spark-packages.org/package/dibbhatt/kafka-spark-consumer

You do not need to enable WAL in this and still recover from Driver failure
with no data loss. You can refer to
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for
more details or can reach out to me.

Regards,
Dibyendu


On Wed, Mar 8, 2017 at 8:58 AM, kant kodali  wrote:

> Hi All,
>
> I am using a Receiver based approach. And I understand that spark
> streaming API's will convert the received data from receiver into blocks
> and these blocks that are in memory are also stored in WAL if one enables
> it. my upstream source which is not Kafka can also replay by which I mean
> if I don't send an ack to my upstream source it will resend it so I don't
> have to write the received data to WAL however I still need to enable WAL
> correct? because there are blocks that are in memory that needs to written
> to WAL so they can be recovered later.
>
> Thanks,
> kant
>


Latest Release of Receiver based Kafka Consumer for Spark Streaming.

2017-02-15 Thread Dibyendu Bhattacharya
Hi ,

Released latest version of Receiver based Kafka Consumer for Spark Streaming
.

Available at Spark Packages : https://spark-packages.org/package/dibbhatt/
kafka-spark-consumer

Also at github  : https://github.com/dibbhatt/kafka-spark-consumer

Some key features

   - Tuned for better performance
   - Support for Spark 2.x, Kafka 0.10
   - Support for Consumer Lag Check ( ConsumerOffsetChecker/ Burrow etc)
   - WAL less recovery
   - Better tuned PID Controller having Auto Rate Adjustment with incoming
   traffic
   - Support for Custom Message Interceptors

Please refer to https://github.com/dibbhatt/kafka-spark-consumer/
blob/master/README.md for more details

Regards,
Dibyendu


Re: Latest Release of Receiver based Kafka Consumer for Spark Streaming.

2016-08-25 Thread Dibyendu Bhattacharya
Hi,

This package is not dependant on any spefic Spark release and can be used
with 1.5 . Please refer to "How To" section here :

https://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Also you will find more information in readme file how to use this package.

Regards,
Dibyendu


On Thu, Aug 25, 2016 at 7:01 PM,  wrote:

> Hi Dibyendu,
>
> Looks like it is available in 2.0, we are using older version of spark 1.5
> . Could you please let me know how to use this with older versions.
>
> Thanks,
> Asmath
>
> Sent from my iPhone
>
> On Aug 25, 2016, at 6:33 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
> Hi ,
>
> Released latest version of Receiver based Kafka Consumer for Spark
> Streaming.
>
> Receiver is compatible with Kafka versions 0.8.x, 0.9.x and 0.10.x and All
> Spark Versions
>
> Available at Spark Packages : https://spark-packages.org/
> package/dibbhatt/kafka-spark-consumer
>
> Also at github  : https://github.com/dibbhatt/kafka-spark-consumer
>
> Salient Features :
>
>- End to End No Data Loss without Write Ahead Log
>- ZK Based offset management for both consumed and processed offset
>- No dependency on WAL and Checkpoint
>- In-built PID Controller for Rate Limiting and Backpressure management
>- Custom Message Interceptor
>
> Please refer to https://github.com/dibbhatt/kafka-spark-consumer/
> blob/master/README.md for more details
>
>
> Regards,
>
> Dibyendu
>
>
>


Latest Release of Receiver based Kafka Consumer for Spark Streaming.

2016-08-25 Thread Dibyendu Bhattacharya
Hi ,

Released latest version of Receiver based Kafka Consumer for Spark
Streaming.

Receiver is compatible with Kafka versions 0.8.x, 0.9.x and 0.10.x and All
Spark Versions

Available at Spark Packages :
https://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Also at github  : https://github.com/dibbhatt/kafka-spark-consumer

Salient Features :

   - End to End No Data Loss without Write Ahead Log
   - ZK Based offset management for both consumed and processed offset
   - No dependency on WAL and Checkpoint
   - In-built PID Controller for Rate Limiting and Backpressure management
   - Custom Message Interceptor

Please refer to
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for
more details


Regards,

Dibyendu


Re: Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-07-13 Thread Dibyendu Bhattacharya
You can get some good pointers in this JIRA

https://issues.apache.org/jira/browse/SPARK-15796

Dibyendu


On Thu, Jul 14, 2016 at 12:53 AM, Sunita  wrote:

> I am facing the same issue. Upgrading to Spark1.6 is causing hugh
> performance
> loss. Could you solve this issue? I am also attempting memory settings as
> mentioned
> http://spark.apache.org/docs/latest/configuration.html#memory-management
>
> But its not making a lot of difference. Appreciate your inputs on this
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-tp27056p27330.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Any Idea about this error : IllegalArgumentException: File segment length cannot be negative ?

2016-07-12 Thread Dibyendu Bhattacharya
In Spark Streaming job, I see a Batch failed with following error. Haven't
seen anything like this earlier.

This has happened during Shuffle for one Batch (haven't reoccurred after
that).. Just curious to know what can cause this error. I am running Spark
1.5.1

Regards,
Dibyendu


Job aborted due to stage failure: Task 2801 in stage 9421.0 failed 4
times, most recent failure: Lost task 2801.3 in stage 9421.0:
java.lang.IllegalArgumentException: requirement failed: File segment
length cannot be negative (got -68321)
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.storage.FileSegment.(FileSegment.scala:28)
at 
org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:216)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:684)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: [Spark 1.6] Spark Streaming - java.lang.AbstractMethodError

2016-01-07 Thread Dibyendu Bhattacharya
Some discussion is there in https://github.com/dibbhatt/kafka-spark-consumer
and some is mentioned in https://issues.apache.org/jira/browse/SPARK-11045

Let me know if those answer your question .

In short, Direct Stream is good choice if you need exact once semantics and
message ordering , but many use case does not need such requirement of
exact-once and message ordering . If you use Direct Stream the RDD
processing parallelism is limited to Kafka partition and you need to store
offset details to external store as checkpoint location is not reliable if
you modify driver code .

Whereas in Receiver based mode , you need to enable WAL for no data loss .
But Spark Receiver based consumer from KafkaUtils which uses Kafka High
Level API has serious issues , and thus if at all you need to switch to
receiver based mode , this low level consumer is a better choice.

Performance wise I have not published any number yet , but from internal
testing and benchmarking I did ( and validated by folks who uses this
consumer ), it perform much better than any existing consumer in Spark .

Regards,
Dibyendu

On Thu, Jan 7, 2016 at 4:28 PM, Jacek Laskowski  wrote:

> On Thu, Jan 7, 2016 at 11:39 AM, Dibyendu Bhattacharya
>  wrote:
> > You are using low level spark kafka consumer . I am the author of the
> same.
>
> If I may ask, what are the differences between this and the direct
> version shipped with spark? I've just started toying with it, and
> would appreciate some guidance. Thanks.
>
> Jacek
>


Re: [Spark 1.6] Spark Streaming - java.lang.AbstractMethodError

2016-01-07 Thread Dibyendu Bhattacharya
Right .. if you are using github version, just modify the ReceiverLauncher
and add that . I will fix it for Spark 1.6 and release new version in
spark-packages for spark 1.6

Dibyendu

On Thu, Jan 7, 2016 at 4:14 PM, Ted Yu  wrote:

> I cloned g...@github.com:dibbhatt/kafka-spark-consumer.git a moment ago.
>
> In ./src/main/java/consumer/kafka/ReceiverLauncher.java , I see:
>jsc.addStreamingListener(new StreamingListener() {
>
> There is no onOutputOperationStarted method implementation.
>
> Looks like it should be added for Spark 1.6.0
>
> Cheers
>
> On Thu, Jan 7, 2016 at 2:39 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> You are using low level spark kafka consumer . I am the author of the
>> same.
>>
>> Are you using the spark-packages version ? if yes which one ?
>>
>> Regards,
>> Dibyendu
>>
>> On Thu, Jan 7, 2016 at 4:07 PM, Jacek Laskowski  wrote:
>>
>>> Hi,
>>>
>>> Do you perhaps use custom StreamingListener?
>>> `StreamingListenerBus.scala:47` calls
>>> `StreamingListener.onOutputOperationStarted` that was added in
>>> [SPARK-10900] [STREAMING] Add output operation events to
>>> StreamingListener [1]
>>>
>>> The other guess could be that at runtime you still use Spark < 1.6.
>>>
>>> [1] https://issues.apache.org/jira/browse/SPARK-10900
>>>
>>> Pozdrawiam,
>>> Jacek
>>>
>>> Jacek Laskowski | https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark
>>> ==> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>>
>>> On Thu, Jan 7, 2016 at 10:59 AM, Walid LEZZAR 
>>> wrote:
>>> > Hi,
>>> >
>>> > We have been using spark streaming for a little while now.
>>> >
>>> > Until now, we were running our spark streaming jobs in spark 1.5.1 and
>>> it
>>> > was working well. Yesterday, we upgraded to spark 1.6.0 without any
>>> changes
>>> > in the code. But our streaming jobs are not working any more. We are
>>> getting
>>> > an "AbstractMethodError". Please, find the stack trace at the end of
>>> the
>>> > mail. Can we have some hints on what this error means ? (we are using
>>> spark
>>> > to connect to kafka)
>>> >
>>> > The stack trace :
>>> > 16/01/07 10:44:39 INFO ZkState: Starting curator service
>>> > 16/01/07 10:44:39 INFO CuratorFrameworkImpl: Starting
>>> > 16/01/07 10:44:39 INFO ZooKeeper: Initiating client connection,
>>> > connectString=localhost:2181 sessionTimeout=12
>>> > watcher=org.apache.curator.ConnectionState@2e9fa23a
>>> > 16/01/07 10:44:39 INFO ClientCnxn: Opening socket connection to server
>>> > localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL
>>> > (unknown error)
>>> > 16/01/07 10:44:39 INFO ClientCnxn: Socket connection established to
>>> > localhost/127.0.0.1:2181, initiating session
>>> > 16/01/07 10:44:39 INFO ClientCnxn: Session establishment complete on
>>> server
>>> > localhost/127.0.0.1:2181, sessionid = 0x1521b6d262e0035, negotiated
>>> timeout
>>> > = 6
>>> > 16/01/07 10:44:39 INFO ConnectionStateManager: State change: CONNECTED
>>> > 16/01/07 10:44:40 INFO PartitionManager: Read partition information
>>> from:
>>> >
>>> /spark-kafka-consumer/StreamingArchiver/lbc.job.multiposting.input/partition_0
>>> > --> null
>>> > 16/01/07 10:44:40 INFO JobScheduler: Added jobs for time 145215988
>>> ms
>>> > 16/01/07 10:44:40 INFO JobScheduler: Starting job streaming job
>>> > 145215988 ms.0 from job set of time 145215988 ms
>>> > 16/01/07 10:44:40 ERROR Utils: uncaught error in thread
>>> > StreamingListenerBus, stopping SparkContext
>>> >
>>> > ERROR Utils: uncaught error in thread StreamingListenerBus, stopping
>>> > SparkContext
>>> > java.lang.AbstractMethodError
>>> > at
>>> >
>>> org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:47)
>>> > at
>>> >
>>> org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:26)
>>> > at
>>>

Re: Need to maintain the consumer offset by myself when using spark streaming kafka direct approach?

2015-12-08 Thread Dibyendu Bhattacharya
In direct stream checkpoint location is not recoverable if you modify your
driver code. So if you just rely on checkpoint to commit offset , you can
possibly loose messages if you modify driver code and you select  offset
from "largest" offset. If you do not want to loose messages,  you need to
commit offset to external store in case of direct stream.

On Tue, Dec 8, 2015 at 7:47 PM, PhuDuc Nguyen 
wrote:

> Kafka Receiver-based approach:
> This will maintain the consumer offsets in ZK for you.
>
> Kafka Direct approach:
> You can use checkpointing and that will maintain consumer offsets for you.
> You'll want to checkpoint to a highly available file system like HDFS or S3.
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
>
> You don't have to maintain your own offsets if you don't want to. If the 2
> solutions above don't satisfy your requirements, then consider writing your
> own; otherwise I would recommend using the supported features in Spark.
>
> HTH,
> Duc
>
>
>
> On Tue, Dec 8, 2015 at 5:05 AM, Tao Li  wrote:
>
>> I am using spark streaming kafka direct approach these days. I found that
>> when I start the application, it always start consumer the latest offset. I
>> hope that when application start, it consume from the offset last
>> application consumes with the same kafka consumer group. It means I have to
>> maintain the consumer offset by my self, for example record it on
>> zookeeper, and reload the last offset from zookeeper when restarting the
>> applicaiton?
>>
>> I see the following discussion:
>> https://github.com/apache/spark/pull/4805
>> https://issues.apache.org/jira/browse/SPARK-6249
>>
>> Is there any conclusion? Do we need to maintain the offset by myself? Or
>> spark streaming will support a feature to simplify the offset maintain work?
>>
>>
>> https://forums.databricks.com/questions/2936/need-to-maintain-the-consumer-offset-by-myself-whe.html
>>
>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
There are other ways to deal with the problem than shutdown the streaming
job. You can monitor the lag in your consumer to see if consumer if falling
behind . If lag is too high that offsetOutOfRange can happen, you either
increase retention period or increase consumer rate..or do both ..

What I am trying to say, streaming job should not fail in any cases ..

Dibyendu

On Thu, Dec 3, 2015 at 9:40 AM, Cody Koeninger  wrote:

> I believe that what differentiates reliable systems is individual
> components should fail fast when their preconditions aren't met, and other
> components should be responsible for monitoring them.
>
> If a user of the direct stream thinks that your approach of restarting and
> ignoring data loss is the right thing to do, they can monitor the job
> (which they should be doing in any case) and restart.
>
> If a user of your library thinks that my approach of failing (so they KNOW
> there was data loss and can adjust their system) is the right thing to do,
> how do they do that?
>
> On Wed, Dec 2, 2015 at 9:49 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Well, even if you do correct retention and increase speed,
>> OffsetOutOfRange can still come depends on how your downstream processing
>> is. And if that happen , there is No Other way to recover old messages . So
>> best bet here from Streaming Job point of view  is to start from earliest
>> offset rather bring down the streaming job . In many cases goal for a
>> streaming job is not to shut down and exit in case of any failure. I
>> believe that is what differentiate a always running streaming job.
>>
>> Dibyendu
>>
>> On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger 
>> wrote:
>>
>>> No, silently restarting from the earliest offset in the case of offset
>>> out of range exceptions during a streaming job is not the "correct way of
>>> recovery".
>>>
>>> If you do that, your users will be losing data without knowing why.
>>> It's more like  a "way of ignoring the problem without actually addressing
>>> it".
>>>
>>> The only really correct way to deal with that situation is to recognize
>>> why it's happening, and either increase your Kafka retention or increase
>>> the speed at which you are consuming.
>>>
>>> On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> This consumer which I mentioned does not silently throw away data. If
>>>> offset out of range it start for earliest offset and that is correct way of
>>>> recovery from this error.
>>>>
>>>> Dibyendu
>>>> On Dec 2, 2015 9:56 PM, "Cody Koeninger"  wrote:
>>>>
>>>>> Again, just to be clear, silently throwing away data because your
>>>>> system isn't working right is not the same as "recover from any Kafka
>>>>> leader changes and offset out of ranges issue".
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>
>>>>>> Hi, if you use Receiver based consumer which is available in
>>>>>> spark-packages (
>>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) ,
>>>>>> this has all built in failure recovery and it can recover from any Kafka
>>>>>> leader changes and offset out of ranges issue.
>>>>>>
>>>>>> Here is the package form github :
>>>>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>>>>
>>>>>>
>>>>>> Dibyendu
>>>>>>
>>>>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>>>>>> swethakasire...@gmail.com> wrote:
>>>>>>
>>>>>>> How to avoid those Errors with receiver based approach? Suppose we
>>>>>>> are OK with at least once processing and use receiver based approach 
>>>>>>> which
>>>>>>> uses ZooKeeper but not query Kafka directly, would these errors(Couldn't
>>>>>>> find leader offsets for
>>>>>>> Set([test_stream,5])))be avoided?
>>>>>>>
>>>>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Kaf

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
Well, even if you do correct retention and increase speed, OffsetOutOfRange
can still come depends on how your downstream processing is. And if that
happen , there is No Other way to recover old messages . So best bet here
from Streaming Job point of view  is to start from earliest offset rather
bring down the streaming job . In many cases goal for a streaming job is
not to shut down and exit in case of any failure. I believe that is what
differentiate a always running streaming job.

Dibyendu

On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger  wrote:

> No, silently restarting from the earliest offset in the case of offset out
> of range exceptions during a streaming job is not the "correct way of
> recovery".
>
> If you do that, your users will be losing data without knowing why.  It's
> more like  a "way of ignoring the problem without actually addressing it".
>
> The only really correct way to deal with that situation is to recognize
> why it's happening, and either increase your Kafka retention or increase
> the speed at which you are consuming.
>
> On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> This consumer which I mentioned does not silently throw away data. If
>> offset out of range it start for earliest offset and that is correct way of
>> recovery from this error.
>>
>> Dibyendu
>> On Dec 2, 2015 9:56 PM, "Cody Koeninger"  wrote:
>>
>>> Again, just to be clear, silently throwing away data because your system
>>> isn't working right is not the same as "recover from any Kafka leader
>>> changes and offset out of ranges issue".
>>>
>>>
>>>
>>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> Hi, if you use Receiver based consumer which is available in
>>>> spark-packages (
>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) ,
>>>> this has all built in failure recovery and it can recover from any Kafka
>>>> leader changes and offset out of ranges issue.
>>>>
>>>> Here is the package form github :
>>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>>
>>>>
>>>> Dibyendu
>>>>
>>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>>>> swethakasire...@gmail.com> wrote:
>>>>
>>>>> How to avoid those Errors with receiver based approach? Suppose we are
>>>>> OK with at least once processing and use receiver based approach which 
>>>>> uses
>>>>> ZooKeeper but not query Kafka directly, would these errors(Couldn't
>>>>> find leader offsets for
>>>>> Set([test_stream,5])))be avoided?
>>>>>
>>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger 
>>>>> wrote:
>>>>>
>>>>>> KafkaRDD.scala , handleFetchErr
>>>>>>
>>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>>>>> swethakasire...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Cody,
>>>>>>>
>>>>>>> How to look at Option 2(see the following)? Which portion of the
>>>>>>> code in Spark Kafka Direct to look at to handle this issue specific to 
>>>>>>> our
>>>>>>> requirements.
>>>>>>>
>>>>>>>
>>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>>> partition And how would it handle the offsets already calculated in
>>>>>>> the
>>>>>>> backlog (if there is one)?
>>>>>>>
>>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> If you're consistently getting offset out of range exceptions, it's
>>>>>>>> probably because messages are getting deleted before you've processed 
>>>>>>>> them.
>>>>>>>>
>>>>>>>> The only real way to deal with this is give kafka more retention,
>>>>>>>> consume faster, or both.
>>>>>>>>
>>>>>>>> If you're just looking for a quick "fix" for an infrequent issue,
>>>>>>>> option 4 is probably easiest.  I wouldn&#

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
This consumer which I mentioned does not silently throw away data. If
offset out of range it start for earliest offset and that is correct way of
recovery from this error.

Dibyendu
On Dec 2, 2015 9:56 PM, "Cody Koeninger"  wrote:

> Again, just to be clear, silently throwing away data because your system
> isn't working right is not the same as "recover from any Kafka leader
> changes and offset out of ranges issue".
>
>
>
> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi, if you use Receiver based consumer which is available in
>> spark-packages (
>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this
>> has all built in failure recovery and it can recover from any Kafka leader
>> changes and offset out of ranges issue.
>>
>> Here is the package form github :
>> https://github.com/dibbhatt/kafka-spark-consumer
>>
>>
>> Dibyendu
>>
>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> How to avoid those Errors with receiver based approach? Suppose we are
>>> OK with at least once processing and use receiver based approach which uses
>>> ZooKeeper but not query Kafka directly, would these errors(Couldn't
>>> find leader offsets for
>>> Set([test_stream,5])))be avoided?
>>>
>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger 
>>> wrote:
>>>
>>>> KafkaRDD.scala , handleFetchErr
>>>>
>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>>> swethakasire...@gmail.com> wrote:
>>>>
>>>>> Hi Cody,
>>>>>
>>>>> How to look at Option 2(see the following)? Which portion of the code
>>>>> in Spark Kafka Direct to look at to handle this issue specific to our
>>>>> requirements.
>>>>>
>>>>>
>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>> partition And how would it handle the offsets already calculated in the
>>>>> backlog (if there is one)?
>>>>>
>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
>>>>> wrote:
>>>>>
>>>>>> If you're consistently getting offset out of range exceptions, it's
>>>>>> probably because messages are getting deleted before you've processed 
>>>>>> them.
>>>>>>
>>>>>> The only real way to deal with this is give kafka more retention,
>>>>>> consume faster, or both.
>>>>>>
>>>>>> If you're just looking for a quick "fix" for an infrequent issue,
>>>>>> option 4 is probably easiest.  I wouldn't do that automatically / 
>>>>>> silently,
>>>>>> because you're losing data.
>>>>>>
>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> So, our Streaming Job fails with the following errors. If you see
>>>>>>> the errors
>>>>>>> below, they are all related to Kafka losing offsets and
>>>>>>> OffsetOutOfRangeException.
>>>>>>>
>>>>>>> What are the options we have other than fixing Kafka? We would like
>>>>>>> to do
>>>>>>> something like the following. How can we achieve 1 and 2 with Spark
>>>>>>> Kafka
>>>>>>> Direct?
>>>>>>>
>>>>>>> 1.Need to see a way to skip some offsets if they are not available
>>>>>>> after the
>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>
>>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>>> partition And how would it handle the offsets already calculated in
>>>>>>> the
>>>>>>> backlog (if there is one)?
>>>>>>>
>>>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>>>> offsets.
>>>>>>>
>>>>>>> 4.Or a straightforward approach would be to monitor the log for this
>>>>>>> error,
>>>>>>&

Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread Dibyendu Bhattacharya
Hi, if you use Receiver based consumer which is available in spark-packages
( http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this
has all built in failure recovery and it can recover from any Kafka leader
changes and offset out of ranges issue.

Here is the package form github :
https://github.com/dibbhatt/kafka-spark-consumer


Dibyendu

On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy 
wrote:

> How to avoid those Errors with receiver based approach? Suppose we are OK
> with at least once processing and use receiver based approach which uses
> ZooKeeper but not query Kafka directly, would these errors(Couldn't find
> leader offsets for
> Set([test_stream,5])))be avoided?
>
> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger  wrote:
>
>> KafkaRDD.scala , handleFetchErr
>>
>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi Cody,
>>>
>>> How to look at Option 2(see the following)? Which portion of the code in
>>> Spark Kafka Direct to look at to handle this issue specific to our
>>> requirements.
>>>
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger 
>>> wrote:
>>>
 If you're consistently getting offset out of range exceptions, it's
 probably because messages are getting deleted before you've processed them.

 The only real way to deal with this is give kafka more retention,
 consume faster, or both.

 If you're just looking for a quick "fix" for an infrequent issue,
 option 4 is probably easiest.  I wouldn't do that automatically / silently,
 because you're losing data.

 On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:

> Hi,
>
> So, our Streaming Job fails with the following errors. If you see the
> errors
> below, they are all related to Kafka losing offsets and
> OffsetOutOfRangeException.
>
> What are the options we have other than fixing Kafka? We would like to
> do
> something like the following. How can we achieve 1 and 2 with Spark
> Kafka
> Direct?
>
> 1.Need to see a way to skip some offsets if they are not available
> after the
> max retries are reached..in that case there might be data loss.
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> 3.Track the offsets separately, restart the job by providing the
> offsets.
>
> 4.Or a straightforward approach would be to monitor the log for this
> error,
> and if it occurs more than X times, kill the job, remove the checkpoint
> directory, and restart.
>
> ERROR DirectKafkaInputDStream:
> ArrayBuffer(kafka.common.UnknownException,
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([test_stream,5]))
>
>
>
> java.lang.ClassNotFoundException:
> kafka.common.NotLeaderForPartitionException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> java.util.concurrent.RejectedExecutionException: Task
>
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
> [Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
> 12112]
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 10
> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
> stage
> 52.0 (TID 255, 172.16.97.97): UnknownReason
>
> Exception in thread "streaming-job-executor-0" java.lang.Error:
> java.lang.InterruptedException
>
> Caused by: java.lang.InterruptedException
>
> java.lang.ClassNotFoundException:
> kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 7 in
> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
> 33.0
> (TID 283, 172.16.97.103): UnknownReason
>
> java.lang.ClassNotFoundException:
> kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
> java.lang.ClassNotFoundException:
> kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html

Re: Need more tasks in KafkaDirectStream

2015-10-28 Thread Dibyendu Bhattacharya
If you do not need one to one semantics and does not want strict ordering
guarantee , you can very well use the Receiver based approach, and this
consumer from Spark-Packages (
https://github.com/dibbhatt/kafka-spark-consumer) can give much better
alternatives in terms of performance and reliability  for Receiver based
approach.

Regards,
Dibyendu

On Thu, Oct 29, 2015 at 11:57 AM, varun sharma 
wrote:

> Right now, there is one to one correspondence between kafka partitions and
> spark partitions.
> I dont have a requirement of one to one semantics.
> I need more tasks to be generated in the job so that it can be
> parallelised and batch can be completed fast. In the previous Receiver
> based approach number of tasks created were independent of kafka
> partitions, I need something like that only.
> Any config available if I dont need one to one semantics?
> Is there any way I can repartition without incurring any additional cost.
>
> Thanks
> *VARUN SHARMA*
>
>


Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

2015-10-24 Thread Dibyendu Bhattacharya
Hi,

I have raised a JIRA ( https://issues.apache.org/jira/browse/SPARK-11045)
to track the discussion but also mailing user group .

This Kafka consumer is around for a while in spark-packages (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer ) and I see
many started using it , I am now thinking of contributing back to Apache
Spark core project so that it can get better support ,visibility and
adoption.

Few Point about this consumer

*Why this is needed :*

This Consumer is NOT the replacement for existing DirectStream API.
DirectStream solves the problem around "Exactly Once" semantics and "Global
Ordering" of messages . But to achieve this DirectStream comes with an
overhead. The overhead of maintaining the offset externally
, limited parallelism while processing the RDD ( as the RDD partition is
same as Kafka Partition ), and higher latency while processing RDD ( as
messages are fetched when RDD is processed) . There are many who does not
want "Exact Once" and "Global Ordering" of messages, or ordering are
managed in external store ( say HBase),  and want more parallelism and
lower latency in their Streaming channel . At this point Spark does not
have a better fallback option available in terms of Receiver Based API.
Present Receiver Based API use Kafka High Level API which is low
performance and has serious issue. [For this reason Kafka is coming up with
new High Level Consumer API in 0.9]

The Consumer which I implemented is using the Kafka Low Level API which
gives more performance.  This consumer has built in fault tolerant features
for all failures recovery. This Consumer extended the code from Storm Kafka
Spout which is being around for some time and has matured over the years
and has all built in Kafka fault tolerant capabilities. This same Kafka
consumer for spark is being running in various production scenarios
presently and already being adopted by many in the spark community.

*Why Can't we fix existing Receiver based API in Spark* :

This is not possible unless you move to Kafka Low Level API . Or let wait
for Kafka 0.9 where they are re-writing the HighLevel Consumer API and
built another kafka spark consumer for Kafka 0.9 customers .
This approach seems to be not good in my opinion. The Kafka Low Level API
which I used in my consumer ( and also DirectStream uses ) will not going
to be deprecated in near future. So if Kafka Consumer for Spark is using
Low Level API for Receiver based mode, that will make sure all Kafka
Customers who are presently in 0.8.x or who will use 0.9 , benefited form
this same API.

*Concerns around Low Level API Complexity*

Yes, implementing a reliable consumer using Kafka Low Level consumer API is
complex. But same has been done for Strom -Kafka Spout and has been stable
for quite some time. This consumer for Spark is battle tested in various
production loads and gives much better performance than existing Kafka
Consumers for Spark and has better fault tolerant approach than existing
Receiver based mode.

*Why can't this consumer continue to be in Spark-Package ?*

This can be possible. But what I see , many customer who want to fallback
to receiver based mode as they may not need "Exact Once" semantics or
"Global Ordering" , seems to little tentative using a spark-package library
for their critical streaming pipeline. And they are forced to use faulty
and buggy Kafka High Level API based mode. This consumer being part of
Spark project will give much higher adoption and support from community.

*Some Major features around this consumer :*

This consumer is controlling the rate limit by maintaining the constant
Block size where as default rate limiting in other Spark consumers are done
by number of messages. This is an issue when Kafka has messages of
different sizes and there is no deterministic way to know the actual block
sizes and memory utilization if rate control done by number of messages.

This consumer has in-built PID controller which controls the Rate of
consumption again by modifying the block size and consume only that much
amount of messages needed from Kafka . In default Spark consumer , it
fetches chunk of messages and then apply throttle to control the rate.
Which can lead to excess I/O while consuming from Kafka.

You can also refer to Readme file for more details  :
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md

If you are using this consumer or going to use it, you can Vote for this
Jira.

Regards,
Dibyendu


Re: Spark Streaming over YARN

2015-10-04 Thread Dibyendu Bhattacharya
How many partitions are there in your Kafka topic ?

Regards,
Dibyendu

On Sun, Oct 4, 2015 at 8:19 PM,  wrote:

> Hello,
> I am using  https://github.com/dibbhatt/kafka-spark-consumer
> I specify 4 receivers in the ReceiverLauncher , but in YARN console I can
> see one node receiving the kafka flow.
> (I use spark 1.3.1)
>
> Tks
> Nicolas
>
>
> - Mail original -
> De: "Dibyendu Bhattacharya" 
> À: nib...@free.fr
> Cc: "Cody Koeninger" , "user" 
> Envoyé: Vendredi 2 Octobre 2015 18:21:35
> Objet: Re: Spark Streaming over YARN
>
>
> If your Kafka topic has 4 partitions , and if you specify 4 Receivers,
> messages from each partitions are received by a dedicated receiver. so your
> receiving parallelism is defined by your number of partitions of your topic
> . Every receiver task will be scheduled evenly among nodes in your cluster.
> There was a JIRA fixed in spark 1.5 which does even distribution of
> receivers.
>
>
>
>
>
> Now for RDD parallelism ( i.e parallelism while processing your RDD ) is
> controlled by your Block Interval and Batch Interval.
>
>
> If your block Interval is 200 Ms, there will be 5 blocks per second. If
> your Batch Interval is 3 seconds, there will 15 blocks per batch. And every
> Batch is one RDD , thus your RDD will be 15 partition , which will be
> honored during processing the RDD ..
>
>
>
>
> Regards,
> Dibyendu
>
>
>
>
> On Fri, Oct 2, 2015 at 9:40 PM, < nib...@free.fr > wrote:
>
>
> Ok so if I set for example 4 receivers (number of nodes), how RDD will be
> distributed over the nodes/core.
> For example in my example I have 4 nodes (with 2 cores)
>
> Tks
> Nicolas
>
>
> - Mail original -
> De: "Dibyendu Bhattacharya" < dibyendu.bhattach...@gmail.com >
> À: nib...@free.fr
> Cc: "Cody Koeninger" < c...@koeninger.org >, "user" <
> user@spark.apache.org >
> Envoyé: Vendredi 2 Octobre 2015 18:01:59
>
>
> Objet: Re: Spark Streaming over YARN
>
>
> Hi,
>
>
> If you need to use Receiver based approach , you can try this one :
> https://github.com/dibbhatt/kafka-spark-consumer
>
>
> This is also part of Spark packages :
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>
>
> You just need to specify the number of Receivers you want for desired
> parallelism while receiving , and rest of the thing will be taken care by
> ReceiverLauncher.
>
>
> This Low level Receiver will give better parallelism both on receiving ,
> and on processing the RDD.
>
>
> Default Receiver based API ( KafkaUtils.createStream) using Kafka High
> level API and Kafka high Level API has serious issue to be used in
> production .
>
>
>
>
> Regards,
>
> Dibyendu
>
>
>
>
>
>
>
>
>
>
> On Fri, Oct 2, 2015 at 9:22 PM, < nib...@free.fr > wrote:
>
>
> From my understanding as soon as I use YARN I don't need to use
> parrallelisme (at least for RDD treatment)
> I don't want to use direct stream as I have to manage the offset
> positionning (in order to be able to start from the last offset treated
> after a spark job failure)
>
>
> - Mail original -
> De: "Cody Koeninger" < c...@koeninger.org >
> À: "Nicolas Biau" < nib...@free.fr >
> Cc: "user" < user@spark.apache.org >
> Envoyé: Vendredi 2 Octobre 2015 17:43:41
> Objet: Re: Spark Streaming over YARN
>
>
>
>
> If you're using the receiver based implementation, and want more
> parallelism, you have to create multiple streams and union them together.
>
>
> Or use the direct stream.
>
>
> On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Spark Streaming over YARN

2015-10-02 Thread Dibyendu Bhattacharya
If your Kafka topic has 4 partitions , and if you specify 4 Receivers,
messages from each partitions are received by a dedicated receiver. so your
receiving parallelism is defined by your number of partitions of your topic
.  Every receiver task will be scheduled evenly among nodes in your
cluster. There was a JIRA fixed in spark 1.5 which does even distribution
of receivers.


Now for RDD parallelism ( i.e parallelism while processing your RDD )  is
controlled by your Block Interval and Batch Interval.

If your block Interval is 200 Ms, there will be 5 blocks per second. If
your Batch Interval is 3 seconds, there will 15 blocks per batch. And every
Batch is one RDD , thus your RDD will be 15 partition , which will be
honored during processing the RDD ..


Regards,
Dibyendu


On Fri, Oct 2, 2015 at 9:40 PM,  wrote:

> Ok so if I set for example 4 receivers (number of nodes), how RDD will be
> distributed over the nodes/core.
> For example in my example I have 4 nodes (with 2 cores)
>
> Tks
> Nicolas
>
>
> - Mail original -
> De: "Dibyendu Bhattacharya" 
> À: nib...@free.fr
> Cc: "Cody Koeninger" , "user" 
> Envoyé: Vendredi 2 Octobre 2015 18:01:59
> Objet: Re: Spark Streaming over YARN
>
>
> Hi,
>
>
> If you need to use Receiver based approach , you can try this one :
> https://github.com/dibbhatt/kafka-spark-consumer
>
>
> This is also part of Spark packages :
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>
>
> You just need to specify the number of Receivers you want for desired
> parallelism while receiving , and rest of the thing will be taken care by
> ReceiverLauncher.
>
>
> This Low level Receiver will give better parallelism both on receiving ,
> and on processing the RDD.
>
>
> Default Receiver based API ( KafkaUtils.createStream) using Kafka High
> level API and Kafka high Level API has serious issue to be used in
> production .
>
>
>
>
> Regards,
>
> Dibyendu
>
>
>
>
>
>
>
>
>
>
> On Fri, Oct 2, 2015 at 9:22 PM, < nib...@free.fr > wrote:
>
>
> From my understanding as soon as I use YARN I don't need to use
> parrallelisme (at least for RDD treatment)
> I don't want to use direct stream as I have to manage the offset
> positionning (in order to be able to start from the last offset treated
> after a spark job failure)
>
>
> - Mail original -
> De: "Cody Koeninger" < c...@koeninger.org >
> À: "Nicolas Biau" < nib...@free.fr >
> Cc: "user" < user@spark.apache.org >
> Envoyé: Vendredi 2 Octobre 2015 17:43:41
> Objet: Re: Spark Streaming over YARN
>
>
>
>
> If you're using the receiver based implementation, and want more
> parallelism, you have to create multiple streams and union them together.
>
>
> Or use the direct stream.
>
>
> On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Spark Streaming over YARN

2015-10-02 Thread Dibyendu Bhattacharya
Hi,

If you need to use Receiver based approach , you can try this one :
https://github.com/dibbhatt/kafka-spark-consumer

This is also part of Spark packages :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

You just need to specify the number of Receivers you want for desired
parallelism while receiving , and rest of the thing will be taken care by
ReceiverLauncher.

This Low level Receiver  will give better parallelism both on receiving ,
and on processing the RDD.

Default Receiver based API ( KafkaUtils.createStream) using Kafka High
level API and Kafka high Level API has serious issue to be used in
production .


Regards,
Dibyendu





On Fri, Oct 2, 2015 at 9:22 PM,  wrote:

> From my understanding as soon as I use YARN I don't need to use
> parrallelisme (at least for RDD treatment)
> I don't want to use direct stream as I have to manage the offset
> positionning (in order to be able to start from the last offset treated
> after a spark job failure)
>
>
> - Mail original -
> De: "Cody Koeninger" 
> À: "Nicolas Biau" 
> Cc: "user" 
> Envoyé: Vendredi 2 Octobre 2015 17:43:41
> Objet: Re: Spark Streaming over YARN
>
>
> If you're using the receiver based implementation, and want more
> parallelism, you have to create multiple streams and union them together.
>
>
> Or use the direct stream.
>
>
> On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread Dibyendu Bhattacharya
In Spark Streaming , Checkpoint Directory is used for two purpose

1. Metadata checkpointing

2. Data checkpointing

If you enable WAL to recover from Driver failure, Spark Streaming will also
write the Received Blocks in WAL which stored in checkpoint directory.

For streaming solution to recover from any failure without any data loss ,
you need to enable Meta Data Check pointing and WAL.  You do not need to
enable Data Check pointing.

>From my experiments and the PR I mentioned , I configured the Meta Data
Check Pointing in HDFS , and stored the Received Blocks OFF_HEAP.  And I
did not use any WAL . The PR I proposed would recover from Driver fail-over
without using any WAL like feature because Blocks are already available in
Tachyon. The Meta Data Checkpoint helps to recover the meta data about past
received blocks.

Now the question is , can I configure Tachyon as my Metadata Checkpoint
location ? I tried that , and Streaming application writes the
receivedBlockMeataData to Tachyon, but on driver failure, it can not
recover the received block meta data from Tachyon. I sometime see Zero size
files in Tachyon checkpoint location , and it can not recover past events .
I need to understand what is the issue of storing meta data in Tachyon .
That needs a different JIRA I guess.

Let me know I am able to explain the current scenario around Spark
Streaming and Tachyon .

Regards,
Dibyendu




On Sat, Sep 26, 2015 at 1:04 PM, N B  wrote:

> Hi Dibyendu,
>
> I am not sure I understand completely. But are you suggesting that
> currently there is no way to enable Checkpoint directory to be in Tachyon?
>
> Thanks
> Nikunj
>
>
> On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi,
>>
>> Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
>> Streaming and make sure Spark Streaming can recover from Driver failure and
>> recover the blocks form Tachyon.
>>
>> The The Motivation for this PR is  :
>>
>> If Streaming application stores the blocks OFF_HEAP, it may not need any
>> WAL like feature to recover from Driver failure. As long as the writing of
>> blocks to Tachyon from Streaming receiver is durable, it should be
>> recoverable from Tachyon directly on Driver failure.
>> This can solve the issue of expensive WAL write and duplicating the
>> blocks both in MEMORY and also WAL and also guarantee end to end
>> No-Data-Loss channel using OFF_HEAP store.
>>
>> https://github.com/apache/spark/pull/8817
>>
>> This PR still under review . But having done various fail over testing in
>> my environment , I see this PR worked perfectly fine without any data loss
>> . Let see what TD and other have to say on this PR .
>>
>> Below is the configuration I used to test this PR ..
>>
>>
>> Spark : 1.6 from Master
>> Tachyon : 0.7.1
>>
>> SparkConfiguration Details :
>>
>> SparkConf conf = new SparkConf().setAppName("TestTachyon")
>> .set("spark.streaming.unpersist", "true")
>> .set("spark.local.dir", "/mnt1/spark/tincan")
>> .set("tachyon.zookeeper.address","10.252.5.113:2182")
>> .set("tachyon.usezookeeper","true")
>> .set("spark.externalBlockStore.url", "tachyon-ft://
>> ip-10-252-5-113.asskickery.us:19998")
>> .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
>> .set("spark.externalBlockStore.folderName","pearson")
>> .set("spark.externalBlockStore.dirId", "subpub")
>>
>> .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");
>>
>> JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
>> 1));
>>
>> String checkpointDirectory = "hdfs://
>> 10.252.5.113:9000/user/hadoop/spark/wal";
>>
>> jsc.checkpoint(checkpointDirectory);
>>
>>
>> //I am using the My Receiver Based Consumer (
>> https://github.com/dibbhatt/kafka-spark-consumer) . But
>> KafkaUtil.CreateStream will also work
>>
>> JavaDStream unionStreams = ReceiverLauncher.launch(
>> jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());
>>
>>
>>
>>
>> Regards,
>> Dibyendu
>>
>> On Sat, Sep 26, 2015 at 11:59 AM, N B  wrote:
>>
>>> Hi Dibyendu,
>>>
>>> How does one go about configuring spark streaming to use tachyon as its
>>> place for storing checkpoints? Also, can one do this with tachyon running
>>> on a completely different node than wh

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-25 Thread Dibyendu Bhattacharya
Hi,

Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
Streaming and make sure Spark Streaming can recover from Driver failure and
recover the blocks form Tachyon.

The The Motivation for this PR is  :

If Streaming application stores the blocks OFF_HEAP, it may not need any
WAL like feature to recover from Driver failure. As long as the writing of
blocks to Tachyon from Streaming receiver is durable, it should be
recoverable from Tachyon directly on Driver failure.
This can solve the issue of expensive WAL write and duplicating the blocks
both in MEMORY and also WAL and also guarantee end to end No-Data-Loss
channel using OFF_HEAP store.

https://github.com/apache/spark/pull/8817

This PR still under review . But having done various fail over testing in
my environment , I see this PR worked perfectly fine without any data loss
. Let see what TD and other have to say on this PR .

Below is the configuration I used to test this PR ..


Spark : 1.6 from Master
Tachyon : 0.7.1

SparkConfiguration Details :

SparkConf conf = new SparkConf().setAppName("TestTachyon")
.set("spark.streaming.unpersist", "true")
.set("spark.local.dir", "/mnt1/spark/tincan")
.set("tachyon.zookeeper.address","10.252.5.113:2182")
.set("tachyon.usezookeeper","true")
.set("spark.externalBlockStore.url", "tachyon-ft://
ip-10-252-5-113.asskickery.us:19998")
.set("spark.externalBlockStore.baseDir", "/sparkstreaming")
.set("spark.externalBlockStore.folderName","pearson")
.set("spark.externalBlockStore.dirId", "subpub")

.set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");

JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
1));

String checkpointDirectory = "hdfs://10.252.5.113:9000/user/hadoop/spark/wal
";

jsc.checkpoint(checkpointDirectory);


//I am using the My Receiver Based Consumer (
https://github.com/dibbhatt/kafka-spark-consumer) . But
KafkaUtil.CreateStream will also work

JavaDStream unionStreams = ReceiverLauncher.launch(
jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());




Regards,
Dibyendu

On Sat, Sep 26, 2015 at 11:59 AM, N B  wrote:

> Hi Dibyendu,
>
> How does one go about configuring spark streaming to use tachyon as its
> place for storing checkpoints? Also, can one do this with tachyon running
> on a completely different node than where spark processes are running?
>
> Thanks
> Nikunj
>
>
> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi Tathagata,
>>
>> Thanks for looking into this. Further investigating I found that the
>> issue is with Tachyon does not support File Append. The streaming receiver
>> which writes to WAL when failed, and again restarted, not able to append to
>> same WAL file after restart.
>>
>> I raised this with Tachyon user group, and Haoyuan told that within 3
>> months time Tachyon file append will be ready. Will revisit this issue
>> again then .
>>
>> Regards,
>> Dibyendu
>>
>>
>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das 
>> wrote:
>>
>>> Looks like somehow the file size reported by the FSInputDStream of
>>> Tachyon's FileSystem interface, is returning zero.
>>>
>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> Just to follow up this thread further .
>>>>
>>>> I was doing some fault tolerant testing of Spark Streaming with Tachyon
>>>> as OFF_HEAP block store. As I said in earlier email, I could able to solve
>>>> the BlockNotFound exception when I used Hierarchical Storage of
>>>> Tachyon ,  which is good.
>>>>
>>>> I continue doing some testing around storing the Spark Streaming WAL
>>>> and CheckPoint files also in Tachyon . Here is few finding ..
>>>>
>>>>
>>>> When I store the Spark Streaming Checkpoint location in Tachyon , the
>>>> throughput is much higher . I tested the Driver and Receiver failure cases
>>>> , and Spark Streaming is able to recover without any Data Loss on Driver
>>>> failure.
>>>>
>>>> *But on Receiver failure , Spark Streaming looses data* as I see
>>>> Exception while reading the WAL file from Tachyon "receivedData" location
>>>>  for the same Receiver id which just failed.
>>>>
>>>> If I change the Checkpoint location back to HDFS , Spark Streaming can
>>>>

Re: Managing scheduling delay in Spark Streaming

2015-09-16 Thread Dibyendu Bhattacharya
Hi Michal,

If you use https://github.com/dibbhatt/kafka-spark-consumer  , it comes
with int own built-in back pressure mechanism. By default this is disabled,
you need to enable it to use this feature with this consumer. It does
control the rate based on Scheduling Delay at runtime..

Regards,
Dibyendu

On Wed, Sep 16, 2015 at 12:32 PM, Akhil Das 
wrote:

> I had a workaround for exactly the same scenario
> http://apache-spark-developers-list.1001551.n3.nabble.com/SparkStreaming-Workaround-for-BlockNotFound-Exceptions-td12096.html
>
> Apart from that, if you are using this consumer
> https://github.com/dibbhatt/kafka-spark-consumer it also has a built-in
> rate limiting, Also in Spark 1.5.0 they have a rate limiting/back-pressure
> (haven't tested it on production though).
>
>
>
> Thanks
> Best Regards
>
> On Tue, Sep 15, 2015 at 11:56 PM, Michal Čizmazia 
> wrote:
>
>> Hi,
>>
>> I have a Reliable Custom Receiver storing messages into Spark. Is there
>> way how to prevent my receiver from storing more messages into Spark when
>> the Scheduling Delay reaches a certain threshold?
>>
>> Possible approaches:
>> #1 Does Spark block on the Receiver.store(messages) call to prevent
>> storing more messages and overflowing the system?
>> #2 How to obtain the Scheduling Delay in the Custom Receiver, so that I
>> can implement the feature.
>>
>> Thanks,
>>
>> Mike
>>
>>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Dibyendu Bhattacharya
Hi,

Just to clarify one point which may not be clear to many. If someone
 decides to use Receiver based approach , the best options at this point is
to use  https://github.com/dibbhatt/kafka-spark-consumer. This will also
work with WAL like any other receiver based consumer. The major issue with
KafkaUtils.CreateStream is,  it use Kafka High Level API which has serious
issue with Consumer Re-balance where as dibbhatt/kafka-spark-consumer use
Low Level Kafka Consumer API which does not have any such issue.  I am not
sure if there is any publicly available performance benchmark done with
this one with the DirectStream, so can not comment on performance benefits
of one over other , but whatever performance benchmark we have done,
dibbhatt/kafka-spark-consumer  stands out..

Regards,
Dibyendu

On Thu, Sep 10, 2015 at 8:08 PM, Cody Koeninger  wrote:

> You have to store offsets somewhere.
>
> If you're going to store them in checkpoints, then you have to deal with
> the fact that checkpoints aren't recoverable on code change.  Starting up
> the new version helps because you don't start it from the same checkpoint
> directory as the running one... it has your new code, and is storing to a
> new checkpoint directory.  If you started the new one from the latest
> offsets, you can shut down the old one as soon as it's caught up.
>
> If you don't like the implications of storing offsets in checkpoints...
> then sure, store them yourself.  A real database would be better, but if
> you really want to store them in zookeeper you can.  In any case, just do
> your offset saves in the same foreachPartition your other output operations
> are occurring in, after they've successfully completed.
>
> If you don't care about performance benefits of the direct stream and
> don't want exactly once semantics, sure use the old stream.
>
> Finally, hundreds of gigs just really isn't very much data.  Unless what
> you're doing is really resource intensive, it shouldn't take much time to
> process it all, especially if you can dynamically size a cluster for the
> rare occasion that something is screwed up and you need to reprocess.
>
>
> On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki 
> wrote:
>
>> Thanks guys for your answers. I put my answers in text, below.
>>
>> Cheers,
>> Krzysztof Zarzycki
>>
>> 2015-09-10 15:39 GMT+02:00 Cody Koeninger :
>>
>>> The kafka direct stream meets those requirements.  You don't need
>>> checkpointing for exactly-once.  Indeed, unless your output operations are
>>> idempotent, you can't get exactly-once if you're relying on checkpointing.
>>> Instead, you need to store the offsets atomically in the same transaction
>>> as your results.
>>>
>>
>> To focus discussion, let's assume my operations are idempotent & I'm
>> interested in at-least-once thanks to that (which is idempotent
>> exactly-once as named in your pres). Did you say, that I don't need
>> checkpointing for that? How then direct stream API would store offsets
>>  between restarts?
>>
>>
>>> See
>>> https://github.com/koeninger/kafka-exactly-once
>>> and the video / blog posts linked from it.
>>>
>>>
>> I did that, thank you. What I want is to achieve "idempotent
>> exactly-once" as named in your presentation.
>>
>>
>>> The dibhatt consumer that Akhil linked is using zookeeper to store
>>> offsets, so to the best of my knowledge, it cannot do exactly-once without
>>> idempotent output operations.
>>>
>> True, and I totally accept it if what I get is at-least-once.
>>
>>
>>>
>>>
>> Regarding the issues around code changes and checkpointing, the most
>>> straightforward way to deal with this is to just start a new version of
>>> your job before stopping the old one.  If you care about delivery semantics
>>> and are using checkpointing, your output operation must be idempotent
>>> anyway, so having 2 versions of the code running at the same time for a
>>> brief period should not be a problem.
>>>
>>
>> How starting new version before stopping old one helps? Could you please
>> explain a bit the mechanics of that?
>> Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
>> situations when it will be just inapropriate to run old one, when, let's
>> say, we discovered a bug and don't want to run it anymore.
>>
>>
>> So... To sum up it correctly, if I want at-least-once, with simple code
>> upgrades,  I need to:
>> -  store offsets in external storage (I would choose ZK for that).
>> -  read them on application restart and pass the
>> TopicAndPartition->offset map to createDirectStream.
>> -  And I don't need to use checkpoints at all then.
>> Could you confirm that?
>>
>> It's a question where should I actually commit the ZK offsets. The
>> easiest would be to do it on the end of every batch. Do you think I can use
>> org.apache.spark.streaming.scheduler.StreamingListener, method
>> onBatchCompleted for that? I don't think so, because probably we don't have
>> access to finieshed offsets in it...
>> So maybe each executor can

Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Dibyendu Bhattacharya
Hi,

This is being running in Production in many organization who has adopted
this consumer as an alternative option.  The Consumer will run with spark
1.3.1 .

This is being running in Pearson for sometime in production.

This is part of spark packages and you can see how to include it in your
mvn or sbt .

http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

As this consumer comes with in-built PID controller to control
back-pressure which you can use even if you are using Spark 1.3.1


Regards,
Dibyendu


On Thu, Sep 10, 2015 at 5:48 PM, Krzysztof Zarzycki 
wrote:

> Thanks Akhil, seems like an interesting option to consider.
> Do you know if the package is production-ready? Do you use it in
> production?
>
> And do you know if it works for Spark 1.3.1 as well? README mentions that
> package in spark-packages.org is built with Spark 1.4.1.
>
>
> Anyway, it  seems that core Spark Streaming does not support my case? Or
> anyone can instruct me on how to do it? Let's say, that I'm even fine (but
> not content about) with using KafkaCluster private class that is included
> in Spark, for manual managing ZK offsets. Has someone done it before? Has
> someone public code examples of manually managing ZK offsets?
>
> Thanks,
> Krzysztof
>
> 2015-09-10 12:22 GMT+02:00 Akhil Das :
>
>> This consumer pretty much covers all those scenarios you listed
>> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki > > wrote:
>>
>>> Hi there,
>>> I have a problem with fulfilling all my needs when using Spark Streaming
>>> on Kafka. Let me enumerate my requirements:
>>> 1. I want to have at-least-once/exactly-once processing.
>>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>>> offsets need to be tracked between restarts.
>>> 3. I want to be able to upgrade code of my application without losing
>>> Kafka offsets.
>>>
>>> Now what my requirements imply according to my knowledge:
>>> 1. implies using new Kafka DirectStream.
>>> 2. implies  using checkpointing. kafka DirectStream will write offsets
>>> to the checkpoint as well.
>>> 3. implies that checkpoints can't be used between controlled restarts.
>>> So I need to install shutdownHook with ssc.stop(stopGracefully=true) (here
>>> is a description how:
>>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>>> )
>>>
>>> Now my problems are:
>>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>>> that I have to implement my own storing to/initalization of offsets from
>>> Zookeeper?
>>> 2. When I set up shutdownHook and my any executor throws an exception,
>>> it seems that application does not fail, but stuck in running state. Is
>>> that because stopGracefully deadlocks on exceptions? How to overcome this
>>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>>> stop gracefully your app?
>>>
>>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>>> to be able to upgrade code & not lose Kafka offsets?
>>>
>>>
>>> Thank you a lot for your answers,
>>> Krzysztof Zarzycki
>>>
>>>
>>>
>>>
>>
>


Just Released V1.0.4 Low Level Receiver Based Kafka-Spark-Consumer in Spark Packages having built-in Back Pressure Controller

2015-08-26 Thread Dibyendu Bhattacharya
Dear All,

Just now released the 1.0.4 version of Low Level Receiver based
Kafka-Spark-Consumer in spark-packages.org .  You can find the latest
release here :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Here is github location : https://github.com/dibbhatt/kafka-spark-consumer

This consumer is now have built in PID ( Proportional , Integral,
Derivative ) Rate controller to control the Spark Back-Pressure .

This consumer implemented the Rate Limiting logic not by controlling the
number of messages per block ( as it is done in Spark's Out of Box
Consumers), but by size of the blocks per batch. i.e. for any given batch,
this consumer controls the Rate limit by controlling the size of the
batches. As Spark memory is driven by block size rather the number of
messages , I think rate limit by block size is more appropriate. e.g. Let
assume Kafka contains messages of very small sizes ( say few hundred bytes
) to larger messages ( to few hundred KB ) for same topic. Now if we
control the rate limit by number of messages, Block sizes may vary
drastically based on what type of messages get pulled per block . Whereas ,
if I control my rate limiting by size of block, my block size remain
constant across batches (even though number of messages differ across
blocks ) and can help to tune my memory settings more correctly as I know
how much exact memory my Block is going to consume.


This Consumer has its own PID (Proportional, Integral, Derivative )
Controller built into the consumer and control the Spark Back Pressure by
modifying the size of Block it can consume at run time. The PID Controller
rate feedback mechanism is built using Zookeeper. Again the logic to
control Back Pressure is not by controlling number of messages ( as it is
done in Spark 1.5 , SPARK-7398) but altering size of the Block consumed per
batch from Kafka. As the Back Pressure is built into the Consumer, this
consumer can be used with any version of Spark if anyone want to have a
back pressure controlling mechanism in their existing Spark / Kafka
environment.

Regards,
Dibyendu


Re: BlockNotFoundException when running spark word count on Tachyon

2015-08-26 Thread Dibyendu Bhattacharya
The URL seems to have changed .. here is the one ..
http://tachyon-project.org/documentation/Tiered-Storage-on-Tachyon.html



On Wed, Aug 26, 2015 at 12:32 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Sometime back I was playing with Spark and Tachyon and I also found this
> issue .  The issue here is TachyonBlockManager put the blocks in
> WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted
> from Tachyon Cache when Memory is full and when Spark try to find the
> block it throws  BlockNotFoundException .
>
> To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon
> -project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have
> worked and I did not see any any Spark Job failed due to 
> BlockNotFoundException.
> below is my  Hierarchical Storage settings which I used..
>
>   -Dtachyon.worker.hierarchystore.level.max=2
>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>   -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>
> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>   -Dtachyon.worker.evict.strategy=LRU
>
> Regards,
> Dibyendu
>
> On Wed, Aug 26, 2015 at 12:25 PM, Todd  wrote:
>
>>
>> I am using tachyon in the spark program below,but I encounter a
>> BlockNotFoundxception.
>> Does someone know what's wrong and also is there guide on how to
>> configure spark to work with Tackyon?Thanks!
>>
>> conf.set("spark.externalBlockStore.url", "tachyon://10.18.19.33:19998
>> ")
>> conf.set("spark.externalBlockStore.baseDir","/spark")
>> val sc = new SparkContext(conf)
>> import org.apache.spark.storage.StorageLevel
>> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
>> rdd.persist(StorageLevel.OFF_HEAP)
>> val count = rdd.count()
>>val sum = rdd.reduce(_ + _)
>> println(s"The count: $count, The sum is: $sum")
>>
>>
>> 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
>> tasks have all completed, from pool
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
>> in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage
>> 0.0 (TID 5, localhost): java.lang.RuntimeException:
>> org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found
>> at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>> at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>

Re: BlockNotFoundException when running spark word count on Tachyon

2015-08-26 Thread Dibyendu Bhattacharya
Sometime back I was playing with Spark and Tachyon and I also found this
issue .  The issue here is TachyonBlockManager put the blocks in
WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted
from Tachyon Cache when Memory is full and when Spark try to find the block
it throws  BlockNotFoundException .

To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon
-project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have
worked and I did not see any any Spark Job failed due to
BlockNotFoundException.
below is my  Hierarchical Storage settings which I used..

  -Dtachyon.worker.hierarchystore.level.max=2
  -Dtachyon.worker.hierarchystore.level0.alias=MEM
  -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER

-Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
  -Dtachyon.worker.hierarchystore.level1.alias=HDD
  -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
  -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
  -Dtachyon.worker.allocate.strategy=MAX_FREE
  -Dtachyon.worker.evict.strategy=LRU

Regards,
Dibyendu

On Wed, Aug 26, 2015 at 12:25 PM, Todd  wrote:

>
> I am using tachyon in the spark program below,but I encounter a
> BlockNotFoundxception.
> Does someone know what's wrong and also is there guide on how to configure
> spark to work with Tackyon?Thanks!
>
> conf.set("spark.externalBlockStore.url", "tachyon://10.18.19.33:19998
> ")
> conf.set("spark.externalBlockStore.baseDir","/spark")
> val sc = new SparkContext(conf)
> import org.apache.spark.storage.StorageLevel
> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
> rdd.persist(StorageLevel.OFF_HEAP)
> val count = rdd.count()
>val sum = rdd.reduce(_ + _)
> println(s"The count: $count, The sum is: $sum")
>
>
> 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
> in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage
> 0.0 (TID 5, localhost): java.lang.RuntimeException:
> org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found
> at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
> at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipelin

Re: spark streaming 1.3 kafka error

2015-08-22 Thread Dibyendu Bhattacharya
I think you also can give a try to this consumer :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer in your
environment. This has been running fine for topic with large number of
Kafka partition ( > 200 ) like yours without any issue.. no issue with
connection as this consumer re-use kafka connection , and also can recover
from any failures ( network loss , Kafka leader goes down, ZK down etc ..).


Regards,
Dibyendu

On Sat, Aug 22, 2015 at 7:35 PM, Shushant Arora 
wrote:

> On trying the consumer without external connections  or with low number of
> external conections its working fine -
>
> so doubt is how  socket got closed -
>
> java.io.EOFException: Received -1 when reading from channel, socket has 
> likely been closed.
>
>
>
> On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das 
> wrote:
>
>> Can you try some other consumer and see if the issue still exists?
>> On Aug 22, 2015 12:47 AM, "Shushant Arora" 
>> wrote:
>>
>>> Exception comes when client has so many connections to some another
>>> external server also.
>>> So I think Exception is coming because of client side issue only- server
>>> side there is no issue.
>>>
>>>
>>> Want to understand is executor(simple consumer) not making new
>>> connection to kafka broker at start of each task ? Or is it created once
>>> only and that is getting closed somehow ?
>>>
>>> On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 it comes at start of each tasks when there is new data inserted in
 kafka.( data inserted is very few)
 kafka topic has 300 partitions - data inserted is ~10 MB.

 Tasks gets failed and it retries which succeed and after certain no of
 fail tasks it kills the job.




 On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das 
 wrote:

> That looks like you are choking your kafka machine. Do a top on the
> kafka machines and see the workload, it may happen that you are spending
> too much time on disk io etc.
> On Aug 21, 2015 7:32 AM, "Cody Koeninger"  wrote:
>
>> Sounds like that's happening consistently, not an occasional network
>> problem?
>>
>> Look at the Kafka broker logs
>>
>> Make sure you've configured the correct kafka broker hosts / ports
>> (note that direct stream does not use zookeeper host / port).
>>
>> Make sure that host / port is reachable from your driver and worker
>> nodes, ie telnet or netcat to it.  It looks like your driver can reach it
>> (since there's partition info in the logs), but that doesn't mean the
>> worker can.
>>
>> Use lsof / netstat to see what's going on with those ports while the
>> job is running, or tcpdump if you need to.
>>
>> If you can't figure out what's going on from a networking point of
>> view, post a minimal reproducible code sample that demonstrates the 
>> issue,
>> so it can be tested in a different environment.
>>
>>
>>
>>
>>
>> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Hi
>>>
>>>
>>> Getting below error in spark streaming 1.3 while consuming from kafka 
>>> using directkafka stream. Few of tasks are getting failed in each run.
>>>
>>>
>>> What is the reason /solution of this error?
>>>
>>>
>>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in 
>>> stage 130.0 (TID 16332)
>>> java.io.EOFException: Received -1 when reading from channel, socket has 
>>> likely been closed.
>>> at kafka.utils.Utils$.read(Utils.scala:376)
>>> at 
>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>> at 
>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>> at 
>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>> at 
>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>> at 
>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>>> at 
>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>> at 
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>> at 
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>> at 
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>> at 
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>>> at 
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)

Re: Reliable Streaming Receiver

2015-08-05 Thread Dibyendu Bhattacharya
Hi,

You can try This Kafka Consumer for Spark which is also part of Spark
Packages . https://github.com/dibbhatt/kafka-spark-consumer

Regards,
Dibyendu

On Thu, Aug 6, 2015 at 6:48 AM, Sourabh Chandak 
wrote:

> Thanks Tathagata. I tried that but BlockGenerator internally uses
> SystemClock which is again private.
>
> We are using DSE so stuck with Spark 1.2 hence can't use the receiver-less
> version. Is it possible to use the same code as a separate API with 1.2?
>
> Thanks,
> Sourabh
>
> On Wed, Aug 5, 2015 at 6:13 PM, Tathagata Das  wrote:
>
>>  You could very easily strip out the BlockGenerator code from the Spark
>> source code and use it directly in the same way the Reliable Kafka Receiver
>> uses it. BTW, you should know that we will be deprecating the receiver
>> based approach for the Direct Kafka approach. That is quite flexible, can
>> give exactly-once guarantee without WAL, and is more robust and performant.
>> Consider using it.
>>
>>
>> On Wed, Aug 5, 2015 at 5:48 PM, Sourabh Chandak 
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to replicate the Kafka Streaming Receiver for a custom
>>> version of Kafka and want to create a Reliable receiver. The current
>>> implementation uses BlockGenerator which is a private class inside Spark
>>> streaming hence I can't use that in my code. Can someone help me with some
>>> resources to tackle this issue?
>>>
>>>
>>>
>>> Thanks,
>>> Sourabh
>>>
>>
>>
>


Re: spark streaming get kafka individual message's offset and partition no

2015-07-28 Thread Dibyendu Bhattacharya
If you want the offset of individual kafka messages , you can use this
consumer form Spark Packages ..
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Regards,
Dibyendu

On Tue, Jul 28, 2015 at 6:18 PM, Shushant Arora 
wrote:

> Hi
>
> I am processing kafka messages using spark streaming 1.3.
>
> I am using mapPartitions function to process kafka message.
>  How can I access offset no of individual message getting being processed.
>
>
> JavaPairInputDStream directKafkaStream
> =KafkaUtils.createDirectStream(..);
>
> directKafkaStream.mapPartitions(new
> FlatMapFunction>, String>() {
> public Iterable call(Iterator> t)
> throws Exception {
>
> while(t.hasNext()){
> Tuple2 tuple = t.next();
> byte[] key = tuple._1();
> byte[] msg = tuple._2();
>  ///how to get kafka partition no and offset of this message
>  }
> }
> });
>
>
>
>
>


Some BlockManager Doubts

2015-07-09 Thread Dibyendu Bhattacharya
Hi ,

Just would like to clarify few doubts I have how BlockManager behaves .
This is mostly in regards to Spark Streaming Context .

There are two possible cases Blocks may get dropped / not stored in memory

Case 1. While writing the Block for MEMORY_ONLY settings , if Node's
BlockManager does not have enough memory to unroll the block , Block wont
be stored to memory and Receiver will throw error while writing the Block..
If StorageLevel is using Disk ( as in case MEMORY_AND_DISK) , blocks will
be stored to Disk ONLY IF BlockManager not able to unroll to Memory... This
is fine in the case while receiving the blocks , but this logic has a issue
when old Blocks are chosen to be dropped from memory as Case 2

Case 2 : Now let say either for MEMORY_ONLY or MEMORY_AND_DISK settings ,
blocks are successfully stored to Memory in Case 1 . Now what would happen
if memory limit goes beyond a certain threshold, BlockManager start
dropping LRU blocks from memory which was successfully stored while
receiving.

Primary issue here what I see , while dropping the blocks in Case 2 , Spark
does not check if storage level is using Disk (MEMORY_AND_DISK ) , and even
with DISK storage levels  blocks is drooped from memory without writing it
to Disk.
Or I believe the issue is at the first place that blocks are NOT written to
Disk simultaneously in Case 1 , I understand this will impact throughput ,
but it design may throw BlockNotFound error if Blocks are chosen to be
dropped even in case of StorageLevel is using Disk.

Any thoughts ?

Regards,
Dibyendu


Re: spark streaming with kafka reset offset

2015-06-27 Thread Dibyendu Bhattacharya
Hi,

There is another option to try for Receiver Based Low Level Kafka Consumer
which is part of Spark-Packages (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This can
be used with WAL as well for end to end zero data loss.

This is also Reliable Receiver and Commit offset to ZK.  Given the number
of Kafka Partitions you have ( > 100) , using High Level Kafka API for
Receiver based approach may leads to issues related Consumer Re-balancing
 which is a major issue of Kafka High Level API.

Regards,
Dibyendu



On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das  wrote:

> In the receiver based approach, If the receiver crashes for any reason
> (receiver crashed or executor crashed) the receiver should get restarted on
> another executor and should start reading data from the offset present in
> the zookeeper. There is some chance of data loss which can alleviated using
> Write Ahead Logs (see streaming programming guide for more details, or see
> my talk [Slides PDF
> 
> , Video
> 
> ] from last Spark Summit 2015). But that approach can give duplicate
> records. The direct approach gives exactly-once guarantees, so you should
> try it out.
>
> TD
>
> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger 
> wrote:
>
>> Read the spark streaming guide ad the kafka integration guide for a
>> better understanding of how the receiver based stream works.
>>
>> Capacity planning is specific to your environment and what the job is
>> actually doing, youll need to determine it empirically.
>>
>>
>> On Friday, June 26, 2015, Shushant Arora 
>> wrote:
>>
>>> In 1.2 how to handle offset management after stream application starts
>>> in each job . I should commit offset after job completion manually?
>>>
>>> And what is recommended no of consumer threads. Say I have 300
>>> partitions in kafka cluster . Load is ~ 1 million events per second.Each
>>> event is of ~500bytes. Having 5 receivers with 60 partitions each receiver
>>> is sufficient for spark streaming to consume ?
>>>
>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger 
>>> wrote:
>>>
 The receiver-based kafka createStream in spark 1.2 uses zookeeper to
 store offsets.  If you want finer-grained control over offsets, you can
 update the values in zookeeper yourself before starting the job.

 createDirectStream in spark 1.3 is still marked as experimental, and
 subject to change.  That being said, it works better for me in production
 than the receiver based api.

 On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> I am using spark streaming 1.2.
>
> If processing executors get crashed will receiver rest the offset back
> to last processed offset?
>
> If receiver itself got crashed is there a way to reset the offset
> without restarting streaming application other than smallest or largest.
>
>
> Is spark streaming 1.3  which uses low level consumer api, stabe? And
> which is recommended for handling data  loss 1.2 or 1.3 .
>
>
>
>
>
>
>

>>>
>


Re: Kafka Spark Streaming: ERROR EndpointWriter: dropping message

2015-06-09 Thread Dibyendu Bhattacharya
Hi,

Can you please little detail stack trace from your receiver logs and also
the consumer settings you used ? I have never tested the consumer with
Kafka 0.7.3 ..not sure if Kafka Version is the issue . Have you tried
building the consumer using Kafka 0.7.3 ?

Regards,
Dibyendu

On Wed, Jun 10, 2015 at 11:52 AM, karma243  wrote:

> Thank you for responding @nsalian.
>
> 1. I am trying to replicate  this
>    project on my local
> system.
>
> 2. Yes, kafka and brokers on the same host.
>
> 3. I am working with kafka 0.7.3 and spark 1.3.1. Kafka 0.7.3 does not has
> "--describe" command. Though I've worked on three cases (Kafka and
> Zookeeper
> were on my machine all the time):
>   (i) Producer-Consumer on my machine.
>   (ii) Producer on my machine and Consumer on different machine.
>   (iii) Consumer on my machine and producer on different machine.
>
> All the three cases were working properly.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Streaming-ERROR-EndpointWriter-dropping-message-tp23228p23240.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: [Kafka-Spark-Consumer] Spark-Streaming Job Fails due to Futures timed out

2015-06-08 Thread Dibyendu Bhattacharya
Seems to be related to this JIRA :
https://issues.apache.org/jira/browse/SPARK-3612 ?



On Tue, Jun 9, 2015 at 7:39 AM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi Snehal
>
> Are you running the latest kafka consumer from github/spark-packages ? If
> not can you take the latest changes. This low level receiver will make
> attempt to keep trying if underlying BlockManager gives error. Are you see
> those retry cycle in log ? If yes then there is issue writing blocks to
> blockmanager and spark not able to recover from this failure but Receivet
> keep trying ..
>
> Which version of Spark you are using ?
>
> Dibyendu
> On Jun 9, 2015 5:14 AM, "Snehal Nagmote"  wrote:
>
>> All,
>>
>> I am using Kafka Spark Consumer
>> https://github.com/dibbhatt/kafka-spark-consumer  in  spark streaming
>> job .
>>
>> After spark streaming job runs for few hours , all executors exit and I
>> still see status of application on SPARK UI as running
>>
>> Does anyone know cause of this exception and how to fix this ?
>>
>>
>>  WARN  [sparkDriver-akka.actor.default-dispatcher-17:Logging$class@71] - 
>> Error reported by receiver for stream 7: Error While Store for Partition 
>> Partition{host=dal-kafka-broker01.bfd.walmart.com:9092, partition=27} - 
>> org.apache.spark.SparkException: Error sending message [message = 
>> UpdateBlockInfo(BlockManagerId(2, dfw-searcher.com, 
>> 33621),input-7-1433793457165,StorageLevel(false, true, false, false, 
>> 1),10492,0,0)]
>>  at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
>>  at 
>> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
>>  at 
>> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
>>  at 
>> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
>>  at 
>> org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
>>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812)
>>  at 
>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
>>  at 
>> org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:71)
>>  at 
>> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:161)
>>  at 
>> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushIterator(ReceiverSupervisorImpl.scala:136)
>>  at 
>> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:152)
>>  at consumer.kafka.PartitionManager.next(PartitionManager.java:215)
>>  at consumer.kafka.KafkaConsumer.createStream(KafkaConsumer.java:75)
>>  at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:108)
>>  at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
>> [30 seconds]
>>  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>  at 
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>  at 
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>  at scala.concurrent.Await$.result(package.scala:107)
>>  at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>>
>>  ... 14 more WARN  
>> [sparkDriver-akka.actor.default-dispatcher-30:Logging$class@92] - Error 
>> sending message [message = UpdateBlockInfo(BlockManagerId(, 
>> dfw-searcher.com, 57286),broadcast_10665_piece0,StorageLevel(false, false, 
>> false, false, 1),0,0,0)] in 2 attempts
>> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>>  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>  at 
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>  at 
>> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
>>  at 
>> scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
>>  at 
>> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
>>  at scala.concurrent.Await$.result(package.scala:107)
>>  a

Re: [Kafka-Spark-Consumer] Spark-Streaming Job Fails due to Futures timed out

2015-06-08 Thread Dibyendu Bhattacharya
Hi Snehal

Are you running the latest kafka consumer from github/spark-packages ? If
not can you take the latest changes. This low level receiver will make
attempt to keep trying if underlying BlockManager gives error. Are you see
those retry cycle in log ? If yes then there is issue writing blocks to
blockmanager and spark not able to recover from this failure but Receivet
keep trying ..

Which version of Spark you are using ?

Dibyendu
On Jun 9, 2015 5:14 AM, "Snehal Nagmote"  wrote:

> All,
>
> I am using Kafka Spark Consumer
> https://github.com/dibbhatt/kafka-spark-consumer  in  spark streaming job
> .
>
> After spark streaming job runs for few hours , all executors exit and I
> still see status of application on SPARK UI as running
>
> Does anyone know cause of this exception and how to fix this ?
>
>
>  WARN  [sparkDriver-akka.actor.default-dispatcher-17:Logging$class@71] - 
> Error reported by receiver for stream 7: Error While Store for Partition 
> Partition{host=dal-kafka-broker01.bfd.walmart.com:9092, partition=27} - 
> org.apache.spark.SparkException: Error sending message [message = 
> UpdateBlockInfo(BlockManagerId(2, dfw-searcher.com, 
> 33621),input-7-1433793457165,StorageLevel(false, true, false, false, 
> 1),10492,0,0)]
>   at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
>   at 
> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
>   at 
> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
>   at 
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
>   at 
> org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
>   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812)
>   at 
> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
>   at 
> org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:71)
>   at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:161)
>   at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushIterator(ReceiverSupervisorImpl.scala:136)
>   at 
> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:152)
>   at consumer.kafka.PartitionManager.next(PartitionManager.java:215)
>   at consumer.kafka.KafkaConsumer.createStream(KafkaConsumer.java:75)
>   at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:108)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 
> seconds]
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>   at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>   at scala.concurrent.Await$.result(package.scala:107)
>   at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>
>   ... 14 more WARN  
> [sparkDriver-akka.actor.default-dispatcher-30:Logging$class@92] - Error 
> sending message [message = UpdateBlockInfo(BlockManagerId(, 
> dfw-searcher.com, 57286),broadcast_10665_piece0,StorageLevel(false, false, 
> false, false, 1),0,0,0)] in 2 attempts
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>   at 
> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
>   at 
> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
>   at scala.concurrent.Await$.result(package.scala:107)
>   at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>   at 
> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
>   at 
> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
>   at 
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384)
>   at 
> org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360)
>   at 
> org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1104)
>   at 
> org.apache.spark.storage.BlockManager$$anonfun$removeBroadcast$2.apply(BlockManager.scala:1081)
>   at 
> org.apache.spark.storage.BlockManag

Re: Spark Streaming and Drools

2015-05-22 Thread Dibyendu Bhattacharya
Hi,

Sometime back I played with Distributed Rule processing by integrating
Drool with HBase Co-Processors ..and invoke Rules on any incoming data ..

https://github.com/dibbhatt/hbase-rule-engine

You can get some idea how to use Drools rules if you see this
RegionObserverCoprocessor ..

https://github.com/dibbhatt/hbase-rule-engine/blob/master/src/main/java/hbase/rule/HBaseDroolObserver.java


Idea is basically to create a stateless Ruleengine from the "drl" file and
fire the rule on incoming data ..

Even though the code is for invoking rules on HBase PUT object , but you
can get an idea ..and modify it for Spark..

Dibyendu



On Fri, May 22, 2015 at 3:49 PM, Evo Eftimov  wrote:

> I am not aware of existing examples but you can always “ask” Google
>
>
>
> Basically from Spark Streaming perspective, Drools is a third-party
> Software Library, you would invoke it in the same way as any other
> third-party software library from the Tasks (maps, filters etc) within your
> DAG job
>
>
>
> *From:* Antonio Giambanco [mailto:antogia...@gmail.com]
> *Sent:* Friday, May 22, 2015 11:07 AM
> *To:* Evo Eftimov
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Streaming and Drools
>
>
>
> Thanks a lot Evo,
>
> do you know where I can find some examples?
>
> Have a great one
>
>
> A G
>
>
>
> 2015-05-22 12:00 GMT+02:00 Evo Eftimov :
>
> You can deploy and invoke Drools as a Singleton on every Spark Worker Node
> / Executor / Worker JVM
>
>
>
> You can invoke it from e.g. map, filter etc and use the result from the
> Rule to make decision how to transform/filter an event/message
>
>
>
> *From:* Antonio Giambanco [mailto:antogia...@gmail.com]
> *Sent:* Friday, May 22, 2015 9:43 AM
> *To:* user@spark.apache.org
> *Subject:* Spark Streaming and Drools
>
>
>
> Hi All,
>
> I'm deploying and architecture that uses flume for sending log information
> in a sink.
>
> Spark streaming read from this sink (pull strategy) e process al this
> information, during this process I would like to make some event
> processing. . . for example:
>
> Log appender writes information about all transactions in my trading
> platforms,
>
> if a platform user sells more than buy during a week I need to receive an
> alert on an event dashboard.
>
> How can I realize it? Is it possible with drools?
>
> Thanks so much
>
>
>


Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-05-21 Thread Dibyendu Bhattacharya
Hi Tathagata,

Thanks for looking into this. Further investigating I found that the issue
is with Tachyon does not support File Append. The streaming receiver which
writes to WAL when failed, and again restarted, not able to append to same
WAL file after restart.

I raised this with Tachyon user group, and Haoyuan told that within 3
months time Tachyon file append will be ready. Will revisit this issue
again then .

Regards,
Dibyendu


On Fri, May 22, 2015 at 12:24 AM, Tathagata Das  wrote:

> Looks like somehow the file size reported by the FSInputDStream of
> Tachyon's FileSystem interface, is returning zero.
>
> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Just to follow up this thread further .
>>
>> I was doing some fault tolerant testing of Spark Streaming with Tachyon
>> as OFF_HEAP block store. As I said in earlier email, I could able to solve
>> the BlockNotFound exception when I used Hierarchical Storage of Tachyon
>> ,  which is good.
>>
>> I continue doing some testing around storing the Spark Streaming WAL and
>> CheckPoint files also in Tachyon . Here is few finding ..
>>
>>
>> When I store the Spark Streaming Checkpoint location in Tachyon , the
>> throughput is much higher . I tested the Driver and Receiver failure cases
>> , and Spark Streaming is able to recover without any Data Loss on Driver
>> failure.
>>
>> *But on Receiver failure , Spark Streaming looses data* as I see
>> Exception while reading the WAL file from Tachyon "receivedData" location
>>  for the same Receiver id which just failed.
>>
>> If I change the Checkpoint location back to HDFS , Spark Streaming can
>> recover from both Driver and Receiver failure .
>>
>> Here is the Log details when Spark Streaming receiver failed ...I raised
>> a JIRA for the same issue :
>> https://issues.apache.org/jira/browse/SPARK-7525
>>
>>
>>
>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>> (epoch 1)*
>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
>> remove executor 2 from BlockManagerMaster.
>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
>> block manager BlockManagerId(2, 10.252.5.54, 45789)
>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>> successfully in removeExecutor
>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
>> receiver for stream 2 from 10.252.5.62*:47255
>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage
>> 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
>> not read data from write ahead log record
>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>> at scala.Option.getOrElse(Option.scala:120)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)
>> Caused by: java.lang.IllegalArgumentException:* Seek position is past
>> EOF: 645603894, fileSize = 0*
>> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputS

Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-20 Thread Dibyendu Bhattacharya
Thanks Tathagata for making this change..

Dibyendu

On Thu, May 21, 2015 at 8:24 AM, Tathagata Das  wrote:

> If you are talking about handling driver crash failures, then all bets are
> off anyways! Adding a shutdown hook in the hope of handling driver process
> failure, handles only a some cases (Ctrl-C), but does not handle cases like
> SIGKILL (does not run JVM shutdown hooks) or driver machine crash. So its
> not a good idea to rely on that.
>
> Nonetheless I have opened a PR to handle the shutdown of the
> StreamigntContext in the same way as SparkContext.
> https://github.com/apache/spark/pull/6307
>
>
> On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Thenka Sean . you are right. If driver program is running then I can
>> handle shutdown in main exit path  . But if Driver machine is crashed (if
>> you just stop the application, for example killing the driver process ),
>> then Shutdownhook is the only option isn't it ? What I try to say is , just
>> doing ssc.stop in  sys.ShutdownHookThread  or
>>  Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need
>> to use the Utils.addShutdownHook with a priority .. So just checking if
>> Spark Streaming can make graceful shutdown as default shutdown mechanism.
>>
>> Dibyendu
>>
>> On Tue, May 19, 2015 at 1:03 PM, Sean Owen  wrote:
>>
>>> I don't think you should rely on a shutdown hook. Ideally you try to
>>> stop it in the main exit path of your program, even in case of an
>>> exception.
>>>
>>> On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya
>>>  wrote:
>>> > You mean to say within Runtime.getRuntime().addShutdownHook I call
>>> > ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
>>> >
>>> > This won't work anymore in 1.4.
>>> >
>>> > The SparkContext got stopped before Receiver processed all received
>>> blocks
>>> > and I see below exception in logs. But if I add the
>>> Utils.addShutdownHook
>>> > with the priority as I mentioned , then only graceful shutdown works .
>>> In
>>> > that case shutdown-hook run in priority order.
>>> >
>>>
>>
>>
>


Re: spark streaming doubt

2015-05-19 Thread Dibyendu Bhattacharya
Just to add, there is a Receiver based Kafka consumer which uses Kafka Low
Level Consumer API.

http://spark-packages.org/package/dibbhatt/kafka-spark-consumer


Regards,
Dibyendu

On Tue, May 19, 2015 at 9:00 PM, Akhil Das 
wrote:

>
> On Tue, May 19, 2015 at 8:10 PM, Shushant Arora  > wrote:
>
>> So for Kafka+spark streaming, Receiver based streaming used highlevel api
>> and non receiver based streaming used low level api.
>>
>> 1.In high level receiver based streaming does it registers consumers at
>> each job start(whenever a new job is launched by streaming application say
>> at each second)?
>>
>
> ​-> Receiver based streaming will always have the receiver running
> parallel while your job is running, So by default for every 200ms
> (spark.streaming.blockInterval) the receiver will generate a block of data
> which is read from Kafka.
> ​
>
>
>> 2.No of executors in highlevel receiver based jobs will always equal to
>> no of partitions in topic ?
>>
>
> ​-> Not sure from where did you came up with this. For the non stream
> based one, i think the number of partitions in spark will be equal to the
> number of kafka partitions for the given topic.
> ​
>
>
>> 3.Will data from a single topic be consumed by executors in parllel or
>> only one receiver consumes in multiple threads and assign to executors in
>> high level receiver based approach ?
>>
>> ​-> They will consume the data parallel.​ For the receiver based
> approach, you can actually specify the number of receiver that you want to
> spawn for consuming the messages.
>
>>
>>
>>
>> On Tue, May 19, 2015 at 2:38 PM, Akhil Das 
>> wrote:
>>
>>> spark.streaming.concurrentJobs takes an integer value, not boolean. If
>>> you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
>>> next job will start once it completes the current one.
>>>
>>>
 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 "spark.streaming.concurrentJobs" which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead
 to weird sharing of resources and which can make it hard to debug the
 whether there is sufficient resources in the system to process the ingested
 data fast enough. With only 1 job running at a time, it is easy to see that
 if batch processing time < batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.
>>>
>>>
>>> Copied from TD's answer written in SO
>>> 
>>> .
>>>
>>> Non-receiver based streaming for example you can say are the fileStream,
>>> directStream ones. You can read a bit of information from here
>>> https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, May 19, 2015 at 2:13 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the all
 pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das 
 wrote:

> It will be a single job running at a time by default (you can also
> configure the spark.streaming.concurrentJobs to run jobs parallel which is
> not recommended to put in production).
>
> Now, your batch duration being 1 sec and processing time being 2
> minutes, if you are using a receiver based streaming then ideally those
> receivers will keep on receiving data while the job is running (which will
> accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
> block not found exceptions as spark drops some blocks which are yet to
> process to accumulate new blocks). If you are using a non-receiver based
> approach, you will not have this problem of dropping blocks.
>
> Ideally, if your data is small and you have enough memory to hold your
> data then it will run smoothly without any issues.
>
> Thanks
> Best Regards
>
> On Tue, May 19, 2015 at 1:23 PM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> What happnes if in a streaming application one job is not yet
>> finished and stream interval reaches. Does it starts next job or wait for
>> first 

Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Dibyendu Bhattacharya
Thenka Sean . you are right. If driver program is running then I can handle
shutdown in main exit path  . But if Driver machine is crashed (if you just
stop the application, for example killing the driver process ), then
Shutdownhook is the only option isn't it ? What I try to say is , just
doing ssc.stop in  sys.ShutdownHookThread  or
 Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need
to use the Utils.addShutdownHook with a priority .. So just checking if
Spark Streaming can make graceful shutdown as default shutdown mechanism.

Dibyendu

On Tue, May 19, 2015 at 1:03 PM, Sean Owen  wrote:

> I don't think you should rely on a shutdown hook. Ideally you try to
> stop it in the main exit path of your program, even in case of an
> exception.
>
> On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya
>  wrote:
> > You mean to say within Runtime.getRuntime().addShutdownHook I call
> > ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
> >
> > This won't work anymore in 1.4.
> >
> > The SparkContext got stopped before Receiver processed all received
> blocks
> > and I see below exception in logs. But if I add the Utils.addShutdownHook
> > with the priority as I mentioned , then only graceful shutdown works . In
> > that case shutdown-hook run in priority order.
> >
>


Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Dibyendu Bhattacharya
By the way this happens when I stooped the Driver process ...

On Tue, May 19, 2015 at 12:29 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> You mean to say within Runtime.getRuntime().addShutdownHook I call
> ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
>
> This won't work anymore in 1.4.
>
> The SparkContext got stopped before Receiver processed all received blocks
> and I see below exception in logs. But if I add the Utils.addShutdownHook
> with the priority as I mentioned , then only graceful shutdown works . In
> that case shutdown-hook run in priority order.
>
>
>
> *INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Sent stop
> signal to all 3 receivers*
> ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
> receiver for stream 0: Stopped by driver
> ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
> receiver for stream 1: Stopped by driver
> ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
> receiver for stream 2: Stopped by driver
> *INFO : org.apache.spark.SparkContext - Invoking stop() from shutdown hook*
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/streaming/batch/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/streaming/batch,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/streaming/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/streaming,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/metrics/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/static,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/executors/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/executors,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/environment/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/environment,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/storage/rdd,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/storage/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/storage,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/pool/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/pool,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/stage/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/stage,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/jobs/job/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/jobs/job,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/jobs/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/jobs,null}
> INFO : org.apache.spark.ui.SparkUI - Stopped Spark web UI at

Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Dibyendu Bhattacharya
teBlocksToBatch(ReceiverTracker.scala:105)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:242)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
at scala.util.Try$.apply(Try.scala:161)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalStateException: Shutdown in
progress






On Tue, May 19, 2015 at 11:58 AM, Tathagata Das  wrote:

> If you wanted to stop it gracefully, then why are you not calling
> ssc.stop(stopGracefully = true, stopSparkContext = true)? Then it doesnt
> matter whether the shutdown hook was called or not.
>
> TD
>
> On Mon, May 18, 2015 at 9:43 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi,
>>
>> Just figured out that if I want to perform graceful shutdown of Spark
>> Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no
>> longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for
>> Spark Core, that gets anyway called , which leads to graceful shutdown from
>> Spark streaming failed with error like "Sparkcontext already closed" issue.
>>
>> To solve this , I need to explicitly add Utils.addShutdownHook in my
>> driver with higher priority ( say 150 ) than Spark's shutdown priority of
>> 50 , and there I specified streamingcontext stop method with (false , true)
>> parameter.
>>
>> Just curious to know , if this is how we need to handle shutdown hook
>> going forward ?
>>
>> Can't we make the streaming shutdown default to gracefully  shutdown ?
>>
>> Also the Java Api for adding shutdownhook in Utils looks very dirty with
>> methods like this ..
>>
>>
>>
>> Utils.addShutdownHook(150, new Function0() {
>>  @Override
>> public BoxedUnit apply() {
>> return null;
>> }
>>
>> @Override
>> public byte apply$mcB$sp() {
>> return 0;
>> }
>>
>> @Override
>> public char apply$mcC$sp() {
>> return 0;
>> }
>>
>> @Override
>> public double apply$mcD$sp() {
>> return 0;
>> }
>>
>> @Override
>> public float apply$mcF$sp() {
>> return 0;
>> }
>>
>> @Override
>> public int apply$mcI$sp() {
>> // TODO Auto-generated method stub
>> return 0;
>> }
>>
>> @Override
>> public long apply$mcJ$sp() {
>> return 0;
>> }
>>
>> @Override
>> public short apply$mcS$sp() {
>> return 0;
>> }
>>
>> @Override
>> public void apply$mcV$sp() {
>>  *jsc.stop(false, true);*
>>  }
>>
>> @Override
>> public boolean apply$mcZ$sp() {
>> // TODO Auto-generated method stub
>> return false;
>> }
>> });
>>
>
>


Spark Streaming graceful shutdown in Spark 1.4

2015-05-18 Thread Dibyendu Bhattacharya
Hi,

Just figured out that if I want to perform graceful shutdown of Spark
Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no
longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for
Spark Core, that gets anyway called , which leads to graceful shutdown from
Spark streaming failed with error like "Sparkcontext already closed" issue.

To solve this , I need to explicitly add Utils.addShutdownHook in my driver
with higher priority ( say 150 ) than Spark's shutdown priority of 50 , and
there I specified streamingcontext stop method with (false , true)
parameter.

Just curious to know , if this is how we need to handle shutdown hook going
forward ?

Can't we make the streaming shutdown default to gracefully  shutdown ?

Also the Java Api for adding shutdownhook in Utils looks very dirty with
methods like this ..



Utils.addShutdownHook(150, new Function0() {
 @Override
public BoxedUnit apply() {
return null;
}

@Override
public byte apply$mcB$sp() {
return 0;
}

@Override
public char apply$mcC$sp() {
return 0;
}

@Override
public double apply$mcD$sp() {
return 0;
}

@Override
public float apply$mcF$sp() {
return 0;
}

@Override
public int apply$mcI$sp() {
// TODO Auto-generated method stub
return 0;
}

@Override
public long apply$mcJ$sp() {
return 0;
}

@Override
public short apply$mcS$sp() {
return 0;
}

@Override
public void apply$mcV$sp() {
 *jsc.stop(false, true);*
 }

@Override
public boolean apply$mcZ$sp() {
// TODO Auto-generated method stub
return false;
}
});


Re: force the kafka consumer process to different machines

2015-05-13 Thread Dibyendu Bhattacharya
or you can use this Receiver as well :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Where you can specify how many Receivers you need for your topic and it
will divides the partitions among the Receiver and return the joined stream
for you .

Say you specified 20 receivers , in that case each Receiver can handle 4
partitions and you get consumer parallelism of 20 receivers .

Dibyendu

On Wed, May 13, 2015 at 9:28 PM, 李森栋  wrote:

> thank you very much
>
>
> 来自 魅族 MX4 Pro
>
>  原始邮件 
> 发件人:Cody Koeninger 
> 时间:周三 5月13日 23:52
> 收件人:hotdog 
> 抄送:user@spark.apache.org
> 主题:Re: force the kafka consumer process to different machines
>
> >I assume you're using the receiver based approach?  Have you tried the
> >createDirectStream api?
> >
> >https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
> >
> >If you're sticking with the receiver based approach I think your only
> >option would be to create more consumer streams and union them.  That
> >doesn't give you control over where they're run, but should increase the
> >consumer parallelism.
> >
> >On Wed, May 13, 2015 at 10:33 AM, hotdog  wrote:
> >
> >> I 'm using streaming integrated with streaming-kafka.
> >>
> >> My kafka topic has 80 partitions, while my machines have 40 cores. I
> found
> >> that when the job is running, the kafka consumer processes are only
> deploy
> >> to 2 machines, the bandwidth of the 2 machines will be very very high.
> >>
> >> I wonder is there any way to control the kafka consumer's dispatch?
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/force-the-kafka-consumer-process-to-different-machines-tp22872.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >>
>


Re: Reading Real Time Data only from Kafka

2015-05-13 Thread Dibyendu Bhattacharya
Thanks Cody for your email. I think my concern was not to get the ordering
of message within a partition , which as you said is possible if one knows
how Spark works. The issue is how Spark schedule jobs on every batch  which
is not on the same order they generated. So if that is not guaranteed it
does not matter if you manege order within your partition. So depends on
par-partition ordering to commit offset may leads to offsets commit in
wrong order.

In this thread you have discussed this as well and some workaround  :

https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

So again , one need to understand every details of a Consumer to take a
decision if that solves their use case.

Regards,
Dibyendu

On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger  wrote:

> As far as I can tell, Dibyendu's "cons" boil down to:
>
> 1. Spark checkpoints can't be recovered if you upgrade code
> 2. Some Spark transformations involve a shuffle, which can repartition data
>
> It's not accurate to imply that either one of those things are inherently
> "cons" of the direct stream api.
>
> Regarding checkpoints, nothing about the direct stream requires you to use
> checkpoints.  You can save offsets in a checkpoint, your own database, or
> not save offsets at all (as James wants).  One might even say that the
> direct stream api is . . . flexible . . . in that regard.
>
> Regarding partitions, the direct stream api gives you the same ordering
> guarantee as Kafka, namely that within a given partition messages will be
> in increasing offset order.   Clearly if you do a transformation that
> repartitions the stream, that no longer holds.  Thing is, that doesn't
> matter if you're saving offsets and results for each rdd in the driver.
> The offset ranges for the original rdd don't change as a result of the
> transformation you executed, they're immutable.
>
> Sure, you can get into trouble if you're trying to save offsets / results
> per partition on the executors, after a shuffle of some kind. You can avoid
> this pretty easily by just using normal scala code to do your
> transformation on the iterator inside a foreachPartition.  Again, this
> isn't a "con" of the direct stream api, this is just a need to understand
> how Spark works.
>
>
>
> On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> The low level consumer which Akhil mentioned , has been running in
>> Pearson for last 4-5 months without any downtime. I think this one is the
>> reliable "Receiver Based" Kafka consumer as of today for Spark .. if you
>> say it that way ..
>>
>> Prior to Spark 1.3 other Receiver based consumers have used Kafka High
>> level APIs which has serious issue with re-balancing and lesser fault
>> tolerant aspect and data loss .
>>
>> Cody's implementation is definitely a good approach using direct stream ,
>> but both direct stream based approach and receiver based low level consumer
>> approach has pros and cons. Like Receiver based approach need to use WAL
>> for recovery from Driver failure which is a overhead for Kafka like system
>> . For direct stream the offsets stored as check-pointed directory got lost
>> if driver code is modified ..you can manage offset from your driver but for
>> derived stream generated from this direct stream , there is no guarantee
>> that batches are processed is order ( and offsets commits in order ) .. etc
>> ..
>>
>> So whoever use whichever consumer need to study pros and cons of both
>> approach before taking a call ..
>>
>> Regards,
>> Dibyendu
>>
>>
>>
>>
>>
>>
>>
>> On Tue, May 12, 2015 at 8:10 PM, Akhil Das 
>> wrote:
>>
>>> Hi Cody,
>>> I was just saying that i found more success and high throughput with the
>>> low level kafka api prior to KafkfaRDDs which is the future it seems. My
>>> apologies if you felt it that way. :)
>>> On 12 May 2015 19:47, "Cody Koeninger"  wrote:
>>>
>>>> Akhil, I hope I'm misreading the tone of this. If you have personal
>>>> issues at stake, please take them up outside of the public list.  If you
>>>> have actual factual concerns about the kafka integration, please share them
>>>> in a jira.
>>>>
>>>> Regarding reliability, here's a screenshot of a current production job
>>>> with a 3 week uptime  Was a month before that, only took it down to change
>>>> code.
>>>>
>>>> http://tinypic.com/r/2e4vkht/

Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Dibyendu Bhattacharya
The low level consumer which Akhil mentioned , has been running in Pearson
for last 4-5 months without any downtime. I think this one is the reliable
"Receiver Based" Kafka consumer as of today for Spark .. if you say it that
way ..

Prior to Spark 1.3 other Receiver based consumers have used Kafka High
level APIs which has serious issue with re-balancing and lesser fault
tolerant aspect and data loss .

Cody's implementation is definitely a good approach using direct stream ,
but both direct stream based approach and receiver based low level consumer
approach has pros and cons. Like Receiver based approach need to use WAL
for recovery from Driver failure which is a overhead for Kafka like system
. For direct stream the offsets stored as check-pointed directory got lost
if driver code is modified ..you can manage offset from your driver but for
derived stream generated from this direct stream , there is no guarantee
that batches are processed is order ( and offsets commits in order ) .. etc
..

So whoever use whichever consumer need to study pros and cons of both
approach before taking a call ..

Regards,
Dibyendu







On Tue, May 12, 2015 at 8:10 PM, Akhil Das 
wrote:

> Hi Cody,
> I was just saying that i found more success and high throughput with the
> low level kafka api prior to KafkfaRDDs which is the future it seems. My
> apologies if you felt it that way. :)
> On 12 May 2015 19:47, "Cody Koeninger"  wrote:
>
>> Akhil, I hope I'm misreading the tone of this. If you have personal
>> issues at stake, please take them up outside of the public list.  If you
>> have actual factual concerns about the kafka integration, please share them
>> in a jira.
>>
>> Regarding reliability, here's a screenshot of a current production job
>> with a 3 week uptime  Was a month before that, only took it down to change
>> code.
>>
>> http://tinypic.com/r/2e4vkht/8
>>
>> Regarding flexibility, both of the apis available in spark will do what
>> James needs, as I described.
>>
>>
>>
>> On Tue, May 12, 2015 at 8:55 AM, Akhil Das 
>> wrote:
>>
>>> Hi Cody,
>>>
>>> If you are so sure, can you share a bench-marking (which you ran for
>>> days maybe?) that you have done with Kafka APIs provided by Spark?
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger 
>>> wrote:
>>>
 I don't think it's accurate for Akhil to claim that the linked library
 is "much more flexible/reliable" than what's available in Spark at this
 point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark since 1.3.  The kafka
 parameter auto.offset.reset defaults to largest, ie start at the most
 recent available message.

 This is described at
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html
  The createDirectStream api implementation is described in detail at
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 If for some reason you're stuck using an earlier version of spark, you
 can accomplish what you want simply by starting the job using a new
 consumer group (there will be no prior state in zookeeper, so it will start
 consuming according to auto.offset.reset)

 On Tue, May 12, 2015 at 7:26 AM, James King 
 wrote:

> Very nice! will try and let you know, thanks.
>
> On Tue, May 12, 2015 at 2:25 PM, Akhil Das  > wrote:
>
>> Yep, you can try this lowlevel Kafka receiver
>> https://github.com/dibbhatt/kafka-spark-consumer. Its much more
>> flexible/reliable than the one comes with Spark.
>>
>> Thanks
>> Best Regards
>>
>> On Tue, May 12, 2015 at 5:15 PM, James King 
>> wrote:
>>
>>> What I want is if the driver dies for some reason and it is
>>> restarted I want to read only messages that arrived into Kafka following
>>> the restart of the driver program and re-connection to Kafka.
>>>
>>> Has anyone done this? any links or resources that can help explain
>>> this?
>>>
>>> Regards
>>> jk
>>>
>>>
>>>
>>
>

>>>
>>


Re: Some questions on Multiple Streams

2015-04-24 Thread Dibyendu Bhattacharya
You can probably try the Low Level Consumer from spark-packages (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) .

How many partitions are there for your topics ? Let say you have 10 topics
, and each having 3 partition , ideally you can create max 30 parallel
Receiver and 30 streams. What I understand from your requirement is , for
any given topic you want to choose the number of Receivers . e.g. for Topic
A , you may choose 1 Receiver , for Topic B you choose 2 , for Topic C you
choose 3 etc ..

Now if you can distribute the topics to Receiver like this , you can very
well use the above consumer which has this facility . Each Receiver task
takes one executor core , so you can calculate accordingly.

The implementation has a code example and read-me file , if you wish to try
this , you can always email me .

Regards,
Dibyendu






On Fri, Apr 17, 2015 at 3:06 PM, Laeeq Ahmed 
wrote:

> Hi,
>
> I am working with multiple Kafka streams (23 streams) and currently I am
> processing them separately. I receive one stream from each topic. I have
> the following questions.
>
> 1.Spark streaming guide suggests to union these streams. *Is it
> possible to get statistics of each stream even after they are unioned?*
>
> 2.My calculations are not complex. I use 2 second batch interval and
> if I use 2 streams they get easily processed under 2 seconds by a single
> core. There is some shuffling involved in my application. As I increase the
> number of streams and the number of executors accordingly, the applications
> scheduling delay increases and become unmanageable in 2 seconds. As I
> believe this happens because with that many streams, the number of tasks
> increases thus the shuffling magnifies and also that all streams using the
> same executors. *Is it possible to provide part of executors to
> particular stream while processing streams simultaneously?* E.g. if I
> have 15 cores on cluster and 5 streams, 5  cores will be taken by 5
> receivers and of the rest 10, can I provide 2 cores each to one of the 5
> streams. Just to add, increasing the batch interval does help but I don't
> want to increase the batch size due to application restrictions and delayed
> results (The blockInterval and defaultParallelism does help to a limited
> extent).
>
> *Please see attach file for CODE SNIPPET*
>
> Regards,
> Laeeq
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Latest enhancement in Low Level Receiver based Kafka Consumer

2015-04-01 Thread Dibyendu Bhattacharya
Thanks Neelesh.

Even I have a plan to migrate the offset management to topic based (Kafka
0.8.2) rather zk based .  That will make the consumer much faster . If you
have plan to contribute your work for this consumer , that will be great
also.

Dibyendu



On Wed, Apr 1, 2015 at 11:07 PM, Neelesh  wrote:

> Hi Dibyendu,
>Thanks for your work on this project. Spark 1.3 now has direct kafka
> streams, but still does not provide enough control over partitions and
> topics. For example, the streams are fairly statically configured -
> RDD.getPartitions() is computed only once, thus making it difficult to use
> in a SaaS environment where topics are created and deactivated on the fly
> (one topic per customer, for example). But its easy to build a wrapper
> around your receivers.
> May be there is a play where one can club direct streams with your
> receivers, but I don't quite fully understand how the 1.3 direct streams
> work yet
>
> Another thread -  Kafka 0.8.2 supports non ZK offset management , which I
> think is more scalable than bombarding ZK. I'm working on supporting the
> new offset management strategy for Kafka with kafka-spark-consumer.
>
> Thanks!
> -neelesh
>
> On Wed, Apr 1, 2015 at 9:49 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi,
>>
>> Just to let you know, I have made some enhancement in Low Level Reliable
>> Receiver based Kafka Consumer (
>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer)  .
>>
>> Earlier version uses as many Receiver task for number of partitions of
>> your kafka topic . Now you can configure desired number of Receivers task
>> and every Receiver can handle subset of topic partitions.
>>
>> There was some use cases where consumer need to handle gigantic topics (
>> having 100+ partitions ) and using my receiver creates that many Receiver
>> task and hence that many CPU cores is needed just for Receiver. It was a
>> issue .
>>
>>
>> In latest code, I have changed that behavior. The max limit for number of
>> Receiver is still your number of partition, but if you specify less number
>> of Receiver task, every receiver will handle a subset of partitions and
>> consume using Kafka Low Level consumer API.
>>
>> Every receiver will manages partition(s) offset in ZK as usual way..
>>
>>
>> You can see the latest consumer here :
>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>>
>>
>>
>> Regards,
>> Dibyendu
>>
>>
>


Latest enhancement in Low Level Receiver based Kafka Consumer

2015-04-01 Thread Dibyendu Bhattacharya
Hi,

Just to let you know, I have made some enhancement in Low Level Reliable
Receiver based Kafka Consumer (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer)  .

Earlier version uses as many Receiver task for number of partitions of your
kafka topic . Now you can configure desired number of Receivers task and
every Receiver can handle subset of topic partitions.

There was some use cases where consumer need to handle gigantic topics (
having 100+ partitions ) and using my receiver creates that many Receiver
task and hence that many CPU cores is needed just for Receiver. It was a
issue .


In latest code, I have changed that behavior. The max limit for number of
Receiver is still your number of partition, but if you specify less number
of Receiver task, every receiver will handle a subset of partitions and
consume using Kafka Low Level consumer API.

Every receiver will manages partition(s) offset in ZK as usual way..


You can see the latest consumer here :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer



Regards,
Dibyendu


Re: Question about Spark Streaming Receiver Failure

2015-03-16 Thread Dibyendu Bhattacharya
Yes.. Auto restart is enabled in my low level consumer ..when there is some
unhandled exception comes...

Even if you see KafkaConsumer.java, for some cases ( like broker failure,
kafka leader changes etc ) it can even refresh the Consumer (The
Coordinator which talks to a Leader) which will recover from those
failures..

Dib

On Mon, Mar 16, 2015 at 1:40 PM, Jun Yang  wrote:

> I have checked Dibyendu's code, it looks that his implementation has
> auto-restart mechanism:
>
>
> 
> src/main/java/consumer/kafka/client/KafkaReceiver.java:
>
> private void start() {
>
> // Start the thread that receives data over a connection
> KafkaConfig kafkaConfig = new KafkaConfig(_props);
> ZkState zkState = new ZkState(kafkaConfig);
> _kConsumer = new KafkaConsumer(kafkaConfig, zkState, this);
> _kConsumer.open(_partitionId);
>
> Thread.UncaughtExceptionHandler eh = new
> Thread.UncaughtExceptionHandler() {
> public void uncaughtException(Thread th, Throwable ex) {
>   restart("Restarting Receiver for Partition " + _partitionId ,
> ex, 5000);
> }
> };
>
> _consumerThread = new Thread(_kConsumer);
> _consumerThread.setDaemon(true);
> _consumerThread.setUncaughtExceptionHandler(eh);
> _consumerThread.start();
>   }
>
> 
> I also checked Spark's native Kafka Receiver implementation, and it looks
> not have any auto-restart support.
>
> Any comments from Dibyendu?
>
> On Mon, Mar 16, 2015 at 3:39 PM, Akhil Das 
> wrote:
>
>> As i seen, once i kill my receiver on one machine, it will automatically
>> spawn another receiver on another machine or on the same machine.
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Mar 16, 2015 at 1:08 PM, Jun Yang  wrote:
>>
>>> Dibyendu,
>>>
>>> Thanks for the reply.
>>>
>>> I am reading your project homepage now.
>>>
>>> One quick question I care about is:
>>>
>>> If the receivers failed for some reasons(for example, killed brutally by
>>> someone else), is there any mechanism for the receiver to fail over
>>> automatically?
>>>
>>> On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> Which version of Spark you are running ?
>>>>
>>>> You can try this Low Level Consumer :
>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>>>>
>>>> This is designed to recover from various failures and have very good
>>>> fault recovery mechanism built in. This is being used by many users and at
>>>> present we at Pearson running this Receiver in Production for almost 3
>>>> months without any issue.
>>>>
>>>> You can give this a try.
>>>>
>>>> Regards,
>>>> Dibyendu
>>>>
>>>> On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das >>> > wrote:
>>>>
>>>>> You need to figure out why the receivers failed in the first place.
>>>>> Look in your worker logs and see what really happened. When you run a
>>>>> streaming job continuously for longer period mostly there'll be a lot of
>>>>> logs (you can enable log rotation etc.) and if you are doing a groupBy,
>>>>> join, etc type of operations, then there will be a lot of shuffle data. So
>>>>> You need to check in the worker logs and see what happened (whether DISK
>>>>> full etc.), We have streaming pipelines running for weeks without having
>>>>> any issues.
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang 
>>>>> wrote:
>>>>>
>>>>>> Guys,
>>>>>>
>>>>>> We have a project which builds upon Spark streaming.
>>>>>>
>>>>>> We use Kafka as the input stream, and create 5 receivers.
>>>>>>
>>>>>> When this application runs for around 90 hour, all the 5 receivers
>>>>>> failed for some unknown reasons.
>>>>>>
>>>>>> In

Re: Question about Spark Streaming Receiver Failure

2015-03-16 Thread Dibyendu Bhattacharya
Which version of Spark you are running ?

You can try this Low Level Consumer :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

This is designed to recover from various failures and have very good fault
recovery mechanism built in. This is being used by many users and at
present we at Pearson running this Receiver in Production for almost 3
months without any issue.

You can give this a try.

Regards,
Dibyendu

On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das 
wrote:

> You need to figure out why the receivers failed in the first place. Look
> in your worker logs and see what really happened. When you run a streaming
> job continuously for longer period mostly there'll be a lot of logs (you
> can enable log rotation etc.) and if you are doing a groupBy, join, etc
> type of operations, then there will be a lot of shuffle data. So You need
> to check in the worker logs and see what happened (whether DISK full etc.),
> We have streaming pipelines running for weeks without having any issues.
>
> Thanks
> Best Regards
>
> On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang  wrote:
>
>> Guys,
>>
>> We have a project which builds upon Spark streaming.
>>
>> We use Kafka as the input stream, and create 5 receivers.
>>
>> When this application runs for around 90 hour, all the 5 receivers failed
>> for some unknown reasons.
>>
>> In my understanding, it is not guaranteed that Spark streaming receiver
>> will do fault recovery automatically.
>>
>> So I just want to figure out a way for doing fault-recovery to deal with
>> receiver failure.
>>
>> There is a JIRA post mentioned using StreamingLister for monitoring the
>> status of receiver:
>>
>>
>> https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836
>>
>> However I haven't found any open doc about how to do this stuff.
>>
>> Any guys have met the same issue and deal with it?
>>
>> Our environment:
>>Spark 1.3.0
>>Dual Master Configuration
>>Kafka 0.8.2
>>
>> Thanks
>>
>> --
>> yangjun...@gmail.com
>> http://hi.baidu.com/yjpro
>>
>
>


Re: Spark streaming app shutting down

2015-02-04 Thread Dibyendu Bhattacharya
Thanks Akhil for mentioning this Low Level Consumer (
https://github.com/dibbhatt/kafka-spark-consumer ) . Yes it has better
fault tolerant mechanism than any existing Kafka consumer available . This
has no data loss on receiver failure and have ability to reply or restart
itself in-case of failure. You can definitely give it a try .

Dibyendu

On Thu, Feb 5, 2015 at 1:04 AM, Akhil Das 
wrote:

> AFAIK, From Spark 1.2.0 you can have WAL (Write Ahead Logs) for fault
> tolerance, which means it can handle the receiver/driver failures. You can
> also look at the lowlevel kafka consumer
>  which has a better
> fault tolerance mechanism for receiver failures. This low level consumer
> will push the offset of the message being read into zookeeper for fault
> tolerance. In your case i think mostly the "inflight data" would be lost if
> you arent using any of the fault tolerance mechanism.
>
> Thanks
> Best Regards
>
> On Wed, Feb 4, 2015 at 5:24 PM, Mukesh Jha 
> wrote:
>
>> Hello Sprakans,
>>
>> I'm running a spark streaming app which reads data from kafka topic does
>> some processing and then persists the results in HBase.
>>
>> I am using spark 1.2.0 running on Yarn cluster with 3 executors (2gb, 8
>> cores each). I've enable checkpointing & I am also  rate limiting my
>> kafkaReceivers so that the number of items read is not more than 10 records
>> per sec.
>> The kafkaReceiver I'm using is *not* ReliableKafkaReceiver.
>>
>> This app was running fine for ~3 days then there was an increased load on
>> the HBase server because of some other process querying HBase tables.
>> This led to increase in the batch processing time of the spark batches
>> (processed 1 min batch in 10 min) which previously was finishing in 20 sec
>> which in turn led to the shutdown of the spark application, PFA the
>> executor logs.
>>
>> From the logs I'm getting below exceptions *[1]* & *[2]* looks like
>> there was some outstanding Jobs that didn't get processed or the Job
>> couldn't find the input data. From the logs it looks seems that the
>> shutdown hook gets invoked but it cannot process the in-flight block.
>>
>> I have a couple of queries on this
>>   1) Does this mean that these jobs failed and the *in-flight data *is
>> lost?
>>   2) Does the Spark job *buffers kafka* input data while the Job is
>> under processing state for 10 mins and on shutdown is that too lost? (I do
>> not see any OOM error in the logs).
>>   3) Can we have *explicit commits* enabled in the kafkaReceiver so that
>> the offsets gets committed only when the RDD(s) get successfully processed?
>>
>> Also I'd like to know if there is a *graceful way to shutdown a spark
>> app running on yarn*. Currently I'm killing the yarn app to stop it
>> which leads to loss of that job's history wheras in this case the
>> application stops and succeeds and thus preserves the logs & history.
>>
>> *[1]* 15/02/02 19:30:11 ERROR client.TransportResponseHandler: Still
>> have 1 requests outstanding when connection from
>> hbase28.usdc2.cloud.com/10.193.150.221:43189 is closed
>> *[2]* java.lang.Exception: Could not compute split, block
>> input-2-1422901498800 not found
>> *[3]* 
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>> No lease on /tmp/spark/realtime-failover/msg_2378481654720966.avro (inode
>> 879488): File does not exist. Holder DFSClient_NONMAPREDUCE_-148264920_63
>> does not have any open files.
>>
>> --
>> Thanks & Regards,
>>
>> *Mukesh Jha *
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


Re: Error when Spark streaming consumes from Kafka

2015-02-02 Thread Dibyendu Bhattacharya
Thanks Neelesh . Glad to know this Low Level Consumer is working for you.

Dibyendu

On Tue, Feb 3, 2015 at 8:06 AM, Neelesh  wrote:

> We're planning to use this as well (Dibyendu's
> https://github.com/dibbhatt/kafka-spark-consumer ). Dibyendu, thanks for
> the efforts. So far its working nicely. I think there is merit in make it
> the default Kafka Receiver for spark streaming.
>
> -neelesh
>
> On Mon, Feb 2, 2015 at 5:25 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Or you can use this Low Level Kafka Consumer for Spark :
>> https://github.com/dibbhatt/kafka-spark-consumer
>>
>> This is now part of http://spark-packages.org/ and is running
>> successfully for past few months in Pearson production environment . Being
>> Low Level consumer, it does not have this re-balancing issue which High
>> Level consumer have.
>>
>> Also I know there are few who has shifted to this Low Level Consumer
>> which started giving them a better robust fault tolerant Kafka Receiver for
>> Spark.
>>
>> Regards,
>> Dibyendu
>>
>> On Tue, Feb 3, 2015 at 3:57 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> This is an issue that is hard to resolve without rearchitecting the
>>> whole Kafka Receiver. There are some workarounds worth looking into.
>>>
>>>
>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E
>>>
>>> On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> This seems not fixed yet.
>>>> I filed an issue in jira:
>>>> https://issues.apache.org/jira/browse/SPARK-5505
>>>>
>>>> Greg
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Error when Spark streaming consumes from Kafka

2015-02-02 Thread Dibyendu Bhattacharya
Or you can use this Low Level Kafka Consumer for Spark :
https://github.com/dibbhatt/kafka-spark-consumer

This is now part of http://spark-packages.org/ and is running successfully
for past few months in Pearson production environment . Being Low Level
consumer, it does not have this re-balancing issue which High Level
consumer have.

Also I know there are few who has shifted to this Low Level Consumer which
started giving them a better robust fault tolerant Kafka Receiver for Spark.

Regards,
Dibyendu

On Tue, Feb 3, 2015 at 3:57 AM, Tathagata Das 
wrote:

> This is an issue that is hard to resolve without rearchitecting the whole
> Kafka Receiver. There are some workarounds worth looking into.
>
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCAFbh0Q38qQ0aAg_cj=jzk-kbi8xwf+1m6xlj+fzf6eetj9z...@mail.gmail.com%3E
>
> On Mon, Feb 2, 2015 at 1:07 PM, Greg Temchenko  wrote:
>
>> Hi,
>>
>> This seems not fixed yet.
>> I filed an issue in jira:
>> https://issues.apache.org/jira/browse/SPARK-5505
>>
>> Greg
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-Spark-streaming-consumes-from-Kafka-tp19570p21471.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark Streaming with Kafka

2015-01-21 Thread Dibyendu Bhattacharya
You can probably try the Low Level Consumer option with Spark 1.2

https://github.com/dibbhatt/kafka-spark-consumer

This Consumer can recover from any underlying failure of Spark Platform or
Kafka and either retry or restart the receiver. This is being working
nicely for us.

Regards,
Dibyendu


On Wed, Jan 21, 2015 at 7:46 AM, firemonk9 
wrote:

> Hi,
>
>I am having similar issues. Have you found any resolution ?
>
> Thank you
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222p21276.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: ReliableKafkaReceiver stopped receiving data after WriteAheadLogBasedBlockHandler throws TimeoutException

2015-01-18 Thread Dibyendu Bhattacharya
Hi Max,

Is it possible for you to try Kafka Low Level Consumer which I have written
which is also part of Spark-Packages . This Consumer also a Reliable
Consumer having no data loss on Receiver failure. I have tested this with
Spark 1.2 with  spark.streaming.receiver.writeAheadLog.enable as "true",
and this consumer is pulling messages from Kafka and store in WAL .

I have tested this consumer with Spark 1.2 with WAL feature enabled with a
large Kafka backlog ( around 6 million messages), and it pulls without any
issue.

With WAL feature enabled, the throughput will be impacted .

You can find this consumer here :
https://github.com/dibbhatt/kafka-spark-consumer

Here is the reference of it in spark-package :
http://spark-packages.org/package/5

If you find some issue configuring this, you can reach me.

Regards,
Dibyendu





On Tue, Jan 13, 2015 at 1:40 AM, Max Xu  wrote:

>  Hi all,
>
>
>
> I am running a Spark streaming application with ReliableKafkaReceiver
> (Spark 1.2.0). Constantly I was getting the following exception:
>
>
>
> 15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing
> thread
>
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
> at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
> at scala.concurrent.Await$.result(package.scala:107)
>
> at
> org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
>
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
>
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
>
> at
> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
>
> at org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org
> $apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
>
> at
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
>
> at
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
>
> at org.apache.spark.streaming.receiver.BlockGenerator.org
> $apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
>
> at
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)
>
>
>
> After the exception, ReliableKafkaReceiver stayed in ACTIVE status but
> stopped receiving data from Kafka. The Kafka message handler thread is in
> BLOCKED state:
>
>
>
> Thread 92: KafkaMessageHandler-0 (BLOCKED)
>
>
> org.apache.spark.streaming.receiver.BlockGenerator.addDataWithCallback(BlockGenerator.scala:123)
>
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org
> $apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeMessageAndMetadata(ReliableKafkaReceiver.scala:185)
>
>
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:247)
>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>
> java.util.concurrent.FutureTask.run(FutureTask.java:262)
>
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> java.lang.Thread.run(Thread.java:745)
>
>
>
> Sometimes when the exception was thrown, I also see warning messages like
> this:
>
> 15/01/12 01:08:07 WARN hdfs.DFSClient: Slow ReadProcessor read fields took
> 30533ms (threshold=3ms); ack: seqno: 113 status: SUCCESS status:
> SUCCESS downstreamAckTimeNanos: 30524893062, targets:
> [172.20.xxx.xxx:50010, 172.20.xxx.xxx:50010]
>
> 15/01/12 01:08:07 WARN hdfs.DFSClient: Slow waitForAckedSeqno took 30526ms
> (threshold=3ms)
>
>
>
> In the past, I never have such problem with KafkaReceiver. What causes
> this exception? How can I solve this problem?
>
>
>
> Thanks in advance,
>
> Max
>


Re: Low Level Kafka Consumer for Spark

2015-01-16 Thread Dibyendu Bhattacharya
My code handles the Kafka Consumer part. But writing to Kafka may not be a
big challenge which you can easily do in your driver code.

dibyendu

On Sat, Jan 17, 2015 at 9:43 AM, Debasish Das 
wrote:

> Hi Dib,
>
> For our usecase I want my spark job1 to read from hdfs/cache and write to
> kafka queues. Similarly spark job2 should read from kafka queues and write
> to kafka queues.
>
> Is writing to kafka queues from spark job supported in your code ?
>
> Thanks
> Deb
>  On Jan 15, 2015 11:21 PM, "Akhil Das"  wrote:
>
>> There was a simple example
>> <https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45>
>> which you can run after changing few lines of configurations.
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Hi Kidong,
>>>
>>> Just now I tested the Low Level Consumer with Spark 1.2 and I did not
>>> see any issue with Receiver.Store method . It is able to fetch messages
>>> form Kafka.
>>>
>>> Can you cross check other configurations in your setup like Kafka broker
>>> IP , topic name, zk host details, consumer id etc.
>>>
>>> Dib
>>>
>>> On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> Hi Kidong,
>>>>
>>>> No , I have not tried yet with Spark 1.2 yet. I will try this out and
>>>> let you know how this goes.
>>>>
>>>> By the way, is there any change in Receiver Store method happened in
>>>> Spark 1.2 ?
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Dibyendu
>>>>
>>>>
>>>>
>>>> On Fri, Jan 16, 2015 at 11:25 AM, mykidong  wrote:
>>>>
>>>>> Hi Dibyendu,
>>>>>
>>>>> I am using kafka 0.8.1.1 and spark 1.2.0.
>>>>> After modifying these version of your pom, I have rebuilt your codes.
>>>>> But I have not got any messages from ssc.receiverStream(new
>>>>> KafkaReceiver(_props, i)).
>>>>>
>>>>> I have found, in your codes, all the messages are retrieved correctly,
>>>>> but
>>>>> _receiver.store(_dataBuffer.iterator())  which is spark streaming
>>>>> abstract
>>>>> class's method does not seem to work correctly.
>>>>>
>>>>> Have you tried running your spark streaming kafka consumer with kafka
>>>>> 0.8.1.1 and spark 1.2.0 ?
>>>>>
>>>>> - Kidong.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>


Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Dibyendu Bhattacharya
Hi Kidong,

Just now I tested the Low Level Consumer with Spark 1.2 and I did not see
any issue with Receiver.Store method . It is able to fetch messages form
Kafka.

Can you cross check other configurations in your setup like Kafka broker IP
, topic name, zk host details, consumer id etc.

Dib

On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi Kidong,
>
> No , I have not tried yet with Spark 1.2 yet. I will try this out and let
> you know how this goes.
>
> By the way, is there any change in Receiver Store method happened in Spark
> 1.2 ?
>
>
>
> Regards,
> Dibyendu
>
>
>
> On Fri, Jan 16, 2015 at 11:25 AM, mykidong  wrote:
>
>> Hi Dibyendu,
>>
>> I am using kafka 0.8.1.1 and spark 1.2.0.
>> After modifying these version of your pom, I have rebuilt your codes.
>> But I have not got any messages from ssc.receiverStream(new
>> KafkaReceiver(_props, i)).
>>
>> I have found, in your codes, all the messages are retrieved correctly, but
>> _receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
>> class's method does not seem to work correctly.
>>
>> Have you tried running your spark streaming kafka consumer with kafka
>> 0.8.1.1 and spark 1.2.0 ?
>>
>> - Kidong.
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Dibyendu Bhattacharya
Hi Kidong,

No , I have not tried yet with Spark 1.2 yet. I will try this out and let
you know how this goes.

By the way, is there any change in Receiver Store method happened in Spark
1.2 ?



Regards,
Dibyendu



On Fri, Jan 16, 2015 at 11:25 AM, mykidong  wrote:

> Hi Dibyendu,
>
> I am using kafka 0.8.1.1 and spark 1.2.0.
> After modifying these version of your pom, I have rebuilt your codes.
> But I have not got any messages from ssc.receiverStream(new
> KafkaReceiver(_props, i)).
>
> I have found, in your codes, all the messages are retrieved correctly, but
> _receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
> class's method does not seem to work correctly.
>
> Have you tried running your spark streaming kafka consumer with kafka
> 0.8.1.1 and spark 1.2.0 ?
>
> - Kidong.
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Low Level Kafka Consumer for Spark

2014-12-03 Thread Dibyendu Bhattacharya
Hi,

Yes, as Jerry mentioned, the Spark -3129 (
https://issues.apache.org/jira/browse/SPARK-3129) enabled the WAL feature
which solves the Driver failure problem. The way 3129 is designed , it
solved the driver failure problem agnostic of the source of the stream (
like Kafka or Flume etc) But with just 3129 you can not achieve complete
solution for data loss. You need a reliable receiver which should also
solves the data loss issue on receiver failure.

The Low Level Consumer (https://github.com/dibbhatt/kafka-spark-consumer)
for which this email thread was started has solved that problem with Kafka
Low Level API.

And Spark-4062 as Jerry mentioned also recently solved the same problem
using Kafka High Level API.

On the Kafka High Level Consumer API approach , I would like to mention
that Kafka 0.8 has some issue as mentioned in this wiki (
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design)
where consumer re-balance sometime fails and that is one of the key reason
Kafka is re-writing consumer API in Kafka 0.9.

I know there are few folks already have faced this re-balancing issues
while using Kafka High Level API , and If you ask my opinion, we at Pearson
are still using the Low Level Consumer as this seems to be more robust and
performant and we have been using this for few months without any issue
..and also I may be little biased :)

Regards,
Dibyendu



On Wed, Dec 3, 2014 at 7:04 AM, Shao, Saisai  wrote:

> Hi Rod,
>
> The purpose of introducing  WAL mechanism in Spark Streaming as a general
> solution is to make all the receivers be benefit from this mechanism.
>
> Though as you said, external sources like Kafka have their own checkpoint
> mechanism, instead of storing data in WAL, we can only store metadata to
> WAL, and recover from the last committed offsets. But this requires
> sophisticated design of Kafka receiver with low-level API involved, also we
> need to take care of rebalance and fault tolerance things by ourselves. So
> right now instead of implementing a whole new receiver, we choose to
> implement a simple one, though the performance is not so good, it's much
> easier to understand and maintain.
>
> The design purpose and implementation of reliable Kafka receiver can be
> found in (https://issues.apache.org/jira/browse/SPARK-4062). And in
> future, to improve the reliable Kafka receiver like what you mentioned is
> on our scheduler.
>
> Thanks
> Jerry
>
>
> -Original Message-
> From: RodrigoB [mailto:rodrigo.boav...@aspect.com]
> Sent: Wednesday, December 3, 2014 5:44 AM
> To: u...@spark.incubator.apache.org
> Subject: Re: Low Level Kafka Consumer for Spark
>
> Dibyendu,
>
> Just to make sure I will not be misunderstood - My concerns are referring
> to the Spark upcoming solution and not yours. I would to gather the
> perspective of someone which implemented recovery with Kafka a different
> way.
>
> Tnks,
> Rod
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Error when Spark streaming consumes from Kafka

2014-11-22 Thread Dibyendu Bhattacharya
I believe this is something to do with how Kafka High Level API manages
consumers within a Consumer group and how it re-balance during failure. You
can find some mention in this Kafka wiki.

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design

Due to various issues in Kafka High Level APIs, Kafka is moving the High
Level Consumer API to a complete new set of API in Kafka 0.9.

Other than this co-ordination issue, High Level consumer also has data loss
issues.

You can probably try this Spark-Kafka consumer which uses Low Level Simple
consumer API which is more performant and have no data loss scenarios.

https://github.com/dibbhatt/kafka-spark-consumer

Regards,
Dibyendu

On Sun, Nov 23, 2014 at 2:13 AM, Bill Jay 
wrote:

> Hi all,
>
> I am using Spark to consume from Kafka. However, after the job has run for
> several hours, I saw the following failure of an executor:
>
> kafka.common.ConsumerRebalanceFailedException: 
> group-1416624735998_ip-172-31-5-242.ec2.internal-1416648124230-547d2c31 can't 
> rebalance after 4 retries
> 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
> 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
> 
> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
> 
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138)
> 
> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
> 
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
> 
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
> 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
> 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
> 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
> 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> org.apache.spark.scheduler.Task.run(Task.scala:54)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
>
>
> Does anyone know the reason for this exception? Thanks!
>
> Bill
>
>


Spark - Apache Blur Connector : Index Kafka Messages into Blur using Spark Streaming

2014-09-22 Thread Dibyendu Bhattacharya
Hi,

Last few days I am working on a Spark - Apache Blur Connector to index
Kafka messages into Apache Blur using Spark Streaming. We have been working
on to build a distributed search platform for our NRT use cases and we have
been playing with Spark Streaming and Apache Blur for the same. We are
presently working on Apache Blur and here is a Spark Connector I would like
to share with community to get a feedback for this.

This Connector uses the Low Level Kafka Consumer which I had written few
weeks back (https://github.com/dibbhatt/kafka-spark-consumer). There was a
separate thread on this Kafka Consumer in Spark group.

Even though I was able to index Kafka messages using this low level
consumer via Apache Blur Queuing API , I wanted to try out the Spark
saveAsHadoop* API which can perform bulk loading of RDD into Apache Blur.

For that I have written this Blur Connector for Spark (
https://github.com/dibbhatt/spark-blur-connector).

This connector uses the same Kafka Low level consumer which I mentioned
above, and partition the RDD which is same as number of Shards for target
Blur Table. For this I had to use a Custom Partitioner logic so that
Partition of Keys in RDD is same as Partition of Keys into Targte Blur
Shard.

I also implemented a Custom BlurOutputFormat  to return
the BlurOutputCommitter which use the new Hadoop api
(org.apache.hadoop.mapreduce).

There are few minor changes I did in existing GenericBlurRecordWriter
and BlurOutputCommitter and used modified RecordWriter and OutputCommiter
for this Spark Blur connector. If those minor issues are fixed in Apache
blur, no need to use these custom code .


Have tested this connector to index activity streams coming to Kafka
cluster,  and it nicely index Kafka messages into Target Apache Blur tables.

Would love to hear what you think. I have copied both Apache Blur and Spark
community..


Regards,
Dibyendu


Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Dibyendu Bhattacharya
Hi Tim,

I have not tried persist the RDD.

Here are some discussion on Rate Limiting Spark Streaming is there in this
thread.

http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-rate-limiting-from-kafka-td8590.html

There is a Pull Request https://github.com/apache/spark/pull/945/files to
fix this Rate Limiting issue at BlockGenerator level.

But while testing with heavy load, this fix did not solve my problem. So I
had to have Rate Limiting built into Kafka Consumer. I will make it
configurable soon.

If this is not done, I can see Block are getting dropped which leads to Job
failure.

I have raised this in another thread ..

https://mail.google.com/mail/u/1/?tab=wm#search/Serious/148650fd829cd239.
But have not got any answer yet if this is a bug ( Block getting dropped
and Job failed).



Dib


On Mon, Sep 15, 2014 at 10:33 PM, Tim Smith  wrote:

> Hi Dibyendu,
>
> I am a little confused about the need for rate limiting input from
> kafka. If the stream coming in from kafka has higher message/second
> rate than what a Spark job can process then it should simply build a
> backlog in Spark if the RDDs are cached on disk using persist().
> Right?
>
> Thanks,
>
> Tim
>
>
> On Mon, Sep 15, 2014 at 4:33 AM, Dibyendu Bhattacharya
>  wrote:
> > Hi Alon,
> >
> > No this will not be guarantee that same set of messages will come in same
> > RDD. This fix just re-play the messages from last processed offset in
> same
> > order. Again this is just a interim fix we needed to solve our use case
> . If
> > you do not need this message re-play feature, just do not perform the
> ack (
> > Acknowledgement) call in the Driver code. Then the processed messages
> will
> > not be written to ZK and hence replay will not happen.
> >
> > Regards,
> > Dibyendu
> >
> > On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er 
> > wrote:
> >>
> >> Hi Dibyendu,
> >>
> >> Thanks for your great work!
> >>
> >> I'm new to Spark Streaming, so I just want to make sure I understand
> >> Driver
> >> failure issue correctly.
> >>
> >> In my use case, I want to make sure that messages coming in from Kafka
> are
> >> always broken into the same set of RDDs, meaning that if a set of
> messages
> >> are assigned to one RDD, and the Driver dies before this RDD is
> processed,
> >> then once the Driver recovers, the same set of messages are assigned to
> a
> >> single RDD, instead of arbitrarily repartitioning the messages across
> >> different RDDs.
> >>
> >> Does your Receiver guarantee this behavior, until the problem is fixed
> in
> >> Spark 1.2?
> >>
> >> Regards,
> >> Alon
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Low Level Kafka Consumer for Spark

2014-09-15 Thread Dibyendu Bhattacharya
Hi Alon,

No this will not be guarantee that same set of messages will come in same
RDD. This fix just re-play the messages from last processed offset in same
order. Again this is just a interim fix we needed to solve our use case .
If you do not need this message re-play feature, just do not perform the
ack ( Acknowledgement) call in the Driver code. Then the processed messages
will not be written to ZK and hence replay will not happen.

Regards,
Dibyendu

On Mon, Sep 15, 2014 at 4:48 PM, Alon Pe'er 
wrote:

> Hi Dibyendu,
>
> Thanks for your great work!
>
> I'm new to Spark Streaming, so I just want to make sure I understand Driver
> failure issue correctly.
>
> In my use case, I want to make sure that messages coming in from Kafka are
> always broken into the same set of RDDs, meaning that if a set of messages
> are assigned to one RDD, and the Driver dies before this RDD is processed,
> then once the Driver recovers, the same set of messages are assigned to a
> single RDD, instead of arbitrarily repartitioning the messages across
> different RDDs.
>
> Does your Receiver guarantee this behavior, until the problem is fixed in
> Spark 1.2?
>
> Regards,
> Alon
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p14233.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Some Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..

2014-09-12 Thread Dibyendu Bhattacharya
I agree,

Even the Low Level Kafka Consumer which I have written has tunable IO
throttling which help me solve this issue ... But question remains , even
if there are large backlog, why Spark drop the unprocessed memory blocks ?

Dib

On Fri, Sep 12, 2014 at 5:47 PM, Jeoffrey Lim  wrote:

> Our issue could be related to this problem as described in:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-td14027.html
>  which
> the DStream is processed for every 1 hour batch duration.
>
> I have implemented IO throttling in the Receiver as well in our Kafka
> consumer, and our backlog is not that large.
>
> NFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for dropping
> INFO : org.apache.spark.storage.BlockManager - Dropping block
> *input-0-1410443074600* from memory
> INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600 of
> size 12651900 dropped from memory (free 21220667)
> INFO : org.apache.spark.storage.BlockManagerInfo - Removed
> input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory
> (size: 12.1 MB, free: 100.6 MB)
>
> The question that I have now is: how to prevent the
> MemoryStore/BlockManager of dropping the block inputs? And should they be
> logged in the level WARN/ERROR?
>
>
> Thanks.
>
>
> On Fri, Sep 12, 2014 at 4:45 PM, Dibyendu Bhattacharya [via Apache Spark
> User List] <[hidden email]
> <http://user/SendEmail.jtp?type=node&node=14081&i=0>> wrote:
>
>> Dear all,
>>
>> I am sorry. This was a false alarm
>>
>> There was some issue in the RDD processing logic which leads to large
>> backlog. Once I fixed the issues in my processing logic, I can see all
>> messages being pulled nicely without any Block Removed error. I need to
>> tune certain configurations in my Kafka Consumer to modify the data rate
>> and also the batch size.
>>
>> Sorry again.
>>
>>
>> Regards,
>> Dibyendu
>>
>> On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu <[hidden email]
>> <http://user/SendEmail.jtp?type=node&node=14075&i=0>> wrote:
>>
>>>  This is my case about broadcast variable:
>>>
>>> 14/07/21 19:49:13 INFO Executor: Running task ID 4
>>> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2)
>>> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost 
>>> (progress: 3/106)
>>> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
>>> hdfstest_customers
>>> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596
>>> 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver
>>> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
>>> 14/07/21 19:49:13 INFO Executor: Finished task ID 3
>>> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on 
>>> executor localhost: localhost (PROCESS_LOCAL)
>>> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes 
>>> in 0 ms
>>> 14/07/21 19:49:13 INFO Executor: Running task ID 5
>>> 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0
>>> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 
>>> 19:49:13 INFO ContextCleaner: Cleaned broadcast 0*
>>> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost 
>>> (progress: 4/106)
>>> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
>>> 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 
>>> 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from 
>>> memory (free 886623436)*
>>> 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0
>>> 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0
>>> 14/07/21 19:49:13 INFO HadoopRDD: Input split: 
>>> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5
>>> 14/07/21 
>>> <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21> 
>>> 19:49:13 INFO HadoopRDD: Input split: 
>>> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5
>>> 14/07/21 
>>> <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21> 
>>> 19:49:13 INFO TableOutputFormat: Created table instance for 
>>> hdfstest_customers
>>> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596
>>> 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver
>>> 14/07/21 19:49:13 INFO Exec

Re: Some Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..

2014-09-12 Thread Dibyendu Bhattacharya
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>   at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>   at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>   at 
> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
>   at 
> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
>   at 
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
>
>
>
>
> --
> Nan Zhu
>
> On Thursday, September 11, 2014 at 10:42 AM, Nan Zhu wrote:
>
>  Hi,
>
> Can you attach more logs to see if there is some entry from ContextCleaner?
>
>

Re: How to scale more consumer to Kafka stream

2014-09-11 Thread Dibyendu Bhattacharya
I agree Gerard. Thanks for pointing this..

Dib

On Thu, Sep 11, 2014 at 5:28 PM, Gerard Maas  wrote:

> This pattern works.
>
> One note, thought: Use 'union' only if you need to group the data from all
> RDDs into one RDD for processing (like count distinct or need a groupby).
> If your process can be parallelized over every stream of incoming data, I
> suggest you just apply the required transformations on every dstream and
> avoid 'union' altogether.
>
> -kr, Gerard.
>
>
>
> On Wed, Sep 10, 2014 at 8:17 PM, Tim Smith  wrote:
>
>> How are you creating your kafka streams in Spark?
>>
>> If you have 10 partitions for a topic, you can call "createStream" ten
>> times to create 10 parallel receivers/executors and then use "union" to
>> combine all the dStreams.
>>
>>
>>
>> On Wed, Sep 10, 2014 at 7:16 AM, richiesgr  wrote:
>>
>>> Hi (my previous post as been used by someone else)
>>>
>>> I'm building a application the read from kafka stream event. In
>>> production
>>> we've 5 consumers that share 10 partitions.
>>> But on spark streaming kafka only 1 worker act as a consumer then
>>> distribute
>>> the tasks to workers so I can have only 1 machine acting as consumer but
>>> I
>>> need more because only 1 consumer means Lags.
>>>
>>> Do you've any idea what I can do ? Another point is interresting the
>>> master
>>> is not loaded at all I can get up more than 10 % CPU
>>>
>>> I've tried to increase the queued.max.message.chunks on the kafka client
>>> to
>>> read more records thinking it'll speed up the read but I only get
>>>
>>> ERROR consumer.ConsumerFetcherThread:
>>>
>>> [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
>>> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73;
>>> ClientId:
>>>
>>> SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
>>> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7]
>>> ->
>>> PartitionFetchInfo(929838589,1048576),[IA2,6] ->
>>> PartitionFetchInfo(929515796,1048576),[IA2,9] ->
>>> PartitionFetchInfo(929577946,1048576),[IA2,8] ->
>>> PartitionFetchInfo(930751599,1048576),[IA2,2] ->
>>> PartitionFetchInfo(926457704,1048576),[IA2,5] ->
>>> PartitionFetchInfo(930774385,1048576),[IA2,0] ->
>>> PartitionFetchInfo(929913213,1048576),[IA2,3] ->
>>> PartitionFetchInfo(929268891,1048576),[IA2,4] ->
>>> PartitionFetchInfo(929949877,1048576),[IA2,1] ->
>>> PartitionFetchInfo(930063114,1048576)
>>> java.lang.OutOfMemoryError: Java heap space
>>>
>>> Is someone have ideas ?
>>> Thanks
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: How to scale more consumer to Kafka stream

2014-09-10 Thread Dibyendu Bhattacharya
Hi,

You can use this Kafka Spark Consumer.
https://github.com/dibbhatt/kafka-spark-consumer

This is exactly does that . It creates parallel Receivers for every Kafka
topic partitions. You can see the Consumer.java under consumer.kafka.client
package to see an example how to use it.

There is some discussion on this Consumer you can find it here :
https://mail.google.com/mail/u/1/?tab=wm#search/kafka+spark+consumer/14797b2cbbaa8689

Regards,
Dib


On Wed, Sep 10, 2014 at 11:47 PM, Tim Smith  wrote:

> How are you creating your kafka streams in Spark?
>
> If you have 10 partitions for a topic, you can call "createStream" ten
> times to create 10 parallel receivers/executors and then use "union" to
> combine all the dStreams.
>
>
>
> On Wed, Sep 10, 2014 at 7:16 AM, richiesgr  wrote:
>
>> Hi (my previous post as been used by someone else)
>>
>> I'm building a application the read from kafka stream event. In production
>> we've 5 consumers that share 10 partitions.
>> But on spark streaming kafka only 1 worker act as a consumer then
>> distribute
>> the tasks to workers so I can have only 1 machine acting as consumer but I
>> need more because only 1 consumer means Lags.
>>
>> Do you've any idea what I can do ? Another point is interresting the
>> master
>> is not loaded at all I can get up more than 10 % CPU
>>
>> I've tried to increase the queued.max.message.chunks on the kafka client
>> to
>> read more records thinking it'll speed up the read but I only get
>>
>> ERROR consumer.ConsumerFetcherThread:
>>
>> [ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372],
>> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 73;
>> ClientId:
>>
>> SparkEC2-ConsumerFetcherThread-SparkEC2_ip-10-138-59-194.ec2.internal-1410182950783-5c49c8e8-0-174167372;
>> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [IA2,7] ->
>> PartitionFetchInfo(929838589,1048576),[IA2,6] ->
>> PartitionFetchInfo(929515796,1048576),[IA2,9] ->
>> PartitionFetchInfo(929577946,1048576),[IA2,8] ->
>> PartitionFetchInfo(930751599,1048576),[IA2,2] ->
>> PartitionFetchInfo(926457704,1048576),[IA2,5] ->
>> PartitionFetchInfo(930774385,1048576),[IA2,0] ->
>> PartitionFetchInfo(929913213,1048576),[IA2,3] ->
>> PartitionFetchInfo(929268891,1048576),[IA2,4] ->
>> PartitionFetchInfo(929949877,1048576),[IA2,1] ->
>> PartitionFetchInfo(930063114,1048576)
>> java.lang.OutOfMemoryError: Java heap space
>>
>> Is someone have ideas ?
>> Thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-scale-more-consumer-to-Kafka-stream-tp13883.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Low Level Kafka Consumer for Spark

2014-09-10 Thread Dibyendu Bhattacharya
Hi ,

The latest changes with Kafka message re-play by manipulating ZK offset
seems to be working fine for us. This gives us some relief till actual
issue is fixed in Spark 1.2 .

I have some question on how Spark process the Received data. The logic I
used is basically to pull messages form individual partitions using
dedicated Receivers, and doing a Union of these Stream . After that I
process this union stream.

Today I wanted to test this consumer with our Internal Kafka cluster which
has around 50 million records, with this huge backlog I found Spark only
running the Receiver task and not running the Processing task (or rather
doing it very slow) . Is this a issue with the Consumer or it is a issue
from Spark side ? Ideally when Receivers durably write data to "Store" ,
the processing should start in parallel . Why does the processing task need
to wait till the Receiver consumes all 50 million messages. ...Or may be I
am doing something wrong ? I can share the driver log if you want.

in Driver I can see only "storage.BlockManagerInfo: Added input..." type
messages, but hardly I see "scheduler.TaskSetManager: Starting task..."
messages.. I see data getting written to target system in very very slow
pace.


Regards,
Dibyendu






On Mon, Sep 8, 2014 at 12:08 AM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi Tathagata,
>
> I have managed to implement the logic into the Kafka-Spark consumer to
> recover from Driver failure. This is just a interim fix till actual fix is
> done from Spark side.
>
> The logic is something like this.
>
> 1. When the Individual Receivers starts for every Topic partition, it
> writes the Kafka messages along with certain meta data in Block Store. This
> meta data contains the details of message offset, partition id, topic name
> and consumer id. You can see this logic in PartitionManager.java  next()
> method.
>
> 2.  In the Driver code ( Consumer.java) , I am creating the union of all
> there individual D-Streams, and processing the data using forEachRDD call.
> In the driver code, I am receiving the RDD which contains the Kafka
> messages along with meta data details. In the driver code, periodically I
> am committing the "processed" offset of the Kafka message into ZK.
>
> 3. When driver stops, and restart again, the Receiver starts again, and
> this time in PartitionManager.java, I am checking what is the actual
> "committed" offset for the partition, and what is the actual "processed"
> offset of the same partition. This logic is in the PartitionManager
> constructor.
>
> If this is a Receiver restart, and "processed" offset of less than
> "Committed" offset, I am started fetching again from "Processed" offset.
> This may lead to duplicate records, but our system can handle duplicates.
>
> I have tested with multiple driver kill/stops and I found no data loss in
> Kafka consumer.
>
> In the Driver code, I have not done any "checkpointing" yet, will test
> that tomorrow.
>
>
> One interesting thing I found, if I do "repartition" of original stream ,
> I can still see the issue of data loss in this logic. What I believe,
> during re- partitioning Spark might be changing the order of RDDs the way
> it generated from Kafka stream. So during re-partition case, even when I am
> committing processed offset, but as this is not in order I still see issue.
> Not sure if this understanding is correct, but not able to find any other
> explanation.
>
> But if I do not use repartition this solution works fine.
>
> I can make this as configurable, so that when actual fix is available ,
> this feature in consumer can be turned off as this is an overhead for the
> consumer . Let me know what you think..
>
> Regards,
> Dibyendu
>
>
>
>
> On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Some thoughts on this thread to clarify the doubts.
>>
>> 1. Driver recovery: The current (1.1 to be released) does not recover the
>> raw data that has been received but not processes. This is because when the
>> driver dies, the executors die and so does the raw data that was stored in
>> it. Only for HDFS, the data is not lost by driver recovery as the data is
>> already present reliably in HDFS. This is something we want to fix by Spark
>> 1.2 (3 month from now). Regarding recovery by replaying the data from
>> Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
>> exactly-once semantics in all transformations. To guarantee this for all
>> kinds of streaming computations stateful and not-stateful computations, it
>> is requires that the data be rep

Re: Low Level Kafka Consumer for Spark

2014-09-07 Thread Dibyendu Bhattacharya
Hi Tathagata,

I have managed to implement the logic into the Kafka-Spark consumer to
recover from Driver failure. This is just a interim fix till actual fix is
done from Spark side.

The logic is something like this.

1. When the Individual Receivers starts for every Topic partition, it
writes the Kafka messages along with certain meta data in Block Store. This
meta data contains the details of message offset, partition id, topic name
and consumer id. You can see this logic in PartitionManager.java  next()
method.

2.  In the Driver code ( Consumer.java) , I am creating the union of all
there individual D-Streams, and processing the data using forEachRDD call.
In the driver code, I am receiving the RDD which contains the Kafka
messages along with meta data details. In the driver code, periodically I
am committing the "processed" offset of the Kafka message into ZK.

3. When driver stops, and restart again, the Receiver starts again, and
this time in PartitionManager.java, I am checking what is the actual
"committed" offset for the partition, and what is the actual "processed"
offset of the same partition. This logic is in the PartitionManager
constructor.

If this is a Receiver restart, and "processed" offset of less than
"Committed" offset, I am started fetching again from "Processed" offset.
This may lead to duplicate records, but our system can handle duplicates.

I have tested with multiple driver kill/stops and I found no data loss in
Kafka consumer.

In the Driver code, I have not done any "checkpointing" yet, will test that
tomorrow.


One interesting thing I found, if I do "repartition" of original stream , I
can still see the issue of data loss in this logic. What I believe, during
re- partitioning Spark might be changing the order of RDDs the way it
generated from Kafka stream. So during re-partition case, even when I am
committing processed offset, but as this is not in order I still see issue.
Not sure if this understanding is correct, but not able to find any other
explanation.

But if I do not use repartition this solution works fine.

I can make this as configurable, so that when actual fix is available ,
this feature in consumer can be turned off as this is an overhead for the
consumer . Let me know what you think..

Regards,
Dibyendu




On Fri, Sep 5, 2014 at 11:14 PM, Tathagata Das 
wrote:

> Some thoughts on this thread to clarify the doubts.
>
> 1. Driver recovery: The current (1.1 to be released) does not recover the
> raw data that has been received but not processes. This is because when the
> driver dies, the executors die and so does the raw data that was stored in
> it. Only for HDFS, the data is not lost by driver recovery as the data is
> already present reliably in HDFS. This is something we want to fix by Spark
> 1.2 (3 month from now). Regarding recovery by replaying the data from
> Kafka, it is possible but tricky. Our goal is to provide strong guarantee,
> exactly-once semantics in all transformations. To guarantee this for all
> kinds of streaming computations stateful and not-stateful computations, it
> is requires that the data be replayed through Kafka in exactly same order,
> and the underlying blocks of data in Spark be regenerated in the exact way
> as it would have if there was no driver failure. This is quite tricky to
> implement, requires manipulation of zookeeper offsets, etc, that is hard to
> do with the high level consumer that KafkaUtil uses. Dibyendu's low level
> Kafka receiver may enable such approaches in the future. For now we
> definitely plan to solve the first problem very very soon.
>
> 3. Repartitioning: I am trying to understand the repartition issue. One
> common mistake I have seen is that developers repartition a stream but not
> use the repartitioned stream.
>
> WRONG:
> inputDstream.repartition(100)
> inputDstream.map(...).count().print()
>
> RIGHT:
> val repartitionedDStream = inputDStream.repartitoin(100)
> repartitionedDStream.map(...).count().print()
>
> Not sure if this helps solve the problem that you all the facing. I am
> going to add this to the stremaing programming guide to make sure this
> common mistake is avoided.
>
> TD
>
>
>
>
> On Wed, Sep 3, 2014 at 10:38 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi,
>>
>> Sorry for little delay . As discussed in this thread, I have modified the
>> Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
>> code to have dedicated Receiver for every Topic Partition. You can see the
>> example howto create Union of these receivers
>> in consumer.kafka.client.Consumer.java .
>>
>> Thanks to Chris for suggesting this change.
>>
>> Regards,
>> Dibyendu
>

Re: Low Level Kafka Consumer for Spark

2014-09-03 Thread Dibyendu Bhattacharya
Hi,

Sorry for little delay . As discussed in this thread, I have modified the
Kafka-Spark-Consumer ( https://github.com/dibbhatt/kafka-spark-consumer)
code to have dedicated Receiver for every Topic Partition. You can see the
example howto create Union of these receivers
in consumer.kafka.client.Consumer.java .

Thanks to Chris for suggesting this change.

Regards,
Dibyendu


On Mon, Sep 1, 2014 at 2:55 AM, RodrigoB  wrote:

> Just a comment on the recovery part.
>
> Is it correct to say that currently Spark Streaming recovery design does
> not
> consider re-computations (upon metadata lineage recovery) that depend on
> blocks of data of the received stream?
> https://issues.apache.org/jira/browse/SPARK-1647
>
> Just to illustrate a real use case (mine):
> - We have object states which have a Duration field per state which is
> incremented on every batch interval. Also this object state is reset to 0
> upon incoming state changing events. Let's supposed there is at least one
> event since the last data checkpoint. This will lead to inconsistency upon
> driver recovery: The Duration field will get incremented from the data
> checkpoint version until the recovery moment, but the state change event
> will never be re-processed...so in the end we have the old state with the
> wrong Duration value.
> To make things worst, let's imagine we're dumping the Duration increases
> somewhere...which means we're spreading the problem across our system.
> Re-computation awareness is something I've commented on another thread and
> rather treat it separately.
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205
>
> Re-computations do occur, but the only RDD's that are recovered are the
> ones
> from the data checkpoint. This is what we've seen. Is not enough by itself
> to ensure recovery of computed data and this partial recovery leads to
> inconsistency in some cases.
>
> Roger - I share the same question with you - I'm just not sure if the
> replicated data really gets persisted on every batch. The execution lineage
> is checkpointed, but if we have big chunks of data being consumed to
> Receiver node on let's say a second bases then having it persisted to HDFS
> every second could be a big challenge for keeping JVM performance - maybe
> that could be reason why it's not really implemented...assuming it isn't.
>
> Dibyendu had a great effort with the offset controlling code but the
> general
> state consistent recovery feels to me like another big issue to address.
>
> I plan on having a dive into the Streaming code and try to at least
> contribute with some ideas. Some more insight from anyone on the dev team
> will be very appreciated.
>
> tnks,
> Rod
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Kafka stream receiver stops input

2014-08-27 Thread Dibyendu Bhattacharya
I think this is a known issue in Existing KafkaUtils. Even we had this
issue. The problem is in Existing KafkaUtil there is no way to control the
message flow.

You can refer to another mail thread on Low Level Kafka Consumer which I
have written to solve this issue along with many other..

Dib
On Aug 28, 2014 6:26 AM, "Tim Smith"  wrote:

> Hi,
>
> I have Spark (1.0.0 on CDH5) running with Kafka 0.8.1.1.
>
> I have a streaming jobs that reads from a kafka topic and writes
> output to another kafka topic. The job starts fine but after a while
> the input stream stops getting any data. I think these messages show
> no incoming data on the stream:
> 14/08/28 00:42:15 INFO ReceiverTracker: Stream 0 received 0 blocks
>
> I run the job as:
> spark-submit --class logStreamNormalizer --master yarn
> log-stream-normalizer_2.10-1.0.jar --jars
>
> spark-streaming-kafka_2.10-1.0.2.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
> --executor-memory 6G --spark.cleaner.ttl 60 --executor-cores 4
>
> As soon as I start the job, I see an error like:
>
> 14/08/28 00:50:59 INFO BlockManagerInfo: Added input-0-1409187056800
> in memory on node6-acme.com:39418 (size: 83.3 MB, free: 3.1 GB)
> Exception in thread "pool-1-thread-7" java.lang.OutOfMemoryError: Java
> heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:85)
> at
> org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176)
> at
> org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63)
> at
> org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109)
> at
> org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:42)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
> at
> org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)
> at org.apache.spark.network.ConnectionManager.org
> $apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:662)
> at
> org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:504)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> But not sure if that is the cause because even after that OOM message,
> I see data coming in:
> 14/08/28 00:51:00 INFO ReceiverTracker: Stream 0 received 6 blocks
>
> Appreciate any pointers or suggestions to troubleshoot the issue.
>
> Thanks
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Low Level Kafka Consumer for Spark

2014-08-27 Thread Dibyendu Bhattacharya
I agree. This issue should be fixed in Spark rather rely on replay of Kafka
messages.

Dib
On Aug 28, 2014 6:45 AM, "RodrigoB"  wrote:

> Dibyendu,
>
> Tnks for getting back.
>
> I believe you are absolutely right. We were under the assumption that the
> raw data was being computed again and that's not happening after further
> tests. This applies to Kafka as well.
>
> The issue is of major priority fortunately.
>
> Regarding your suggestion, I would maybe prefer to have the problem
> resolved
> within Spark's internals since once the data is replicated we should be
> able
> to access it once more and not having to pool it back again from Kafka or
> any other stream that is being affected by this issue. If for example there
> is a big amount of batches to be recomputed I would rather have them done
> distributed than overloading the batch interval with huge amount of Kafka
> messages.
>
> I do not have yet enough know how on where is the issue and about the
> internal Spark code so I can't really how much difficult will be the
> implementation.
>
> tnks,
> Rod
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Dibyendu Bhattacharya
Thanks Chris and Bharat for your inputs. I agree, running multiple
receivers/dstreams is desirable for scalability and fault tolerant. and
this is easily doable. In present KafkaReceiver I am creating as many
threads for each kafka topic partitions, but I can definitely create
multiple KafkaReceivers for every partition. As Chris mentioned , in this
case I need to then have union of DStreams for all these Receivers. I will
try this out and let you know.

Dib


On Wed, Aug 27, 2014 at 9:10 AM, Chris Fregly  wrote:

> great work, Dibyendu.  looks like this would be a popular contribution.
>
> expanding on bharat's question a bit:
>
> what happens if you submit multiple receivers to the cluster by creating
> and unioning multiple DStreams as in the kinesis example here:
>
>
> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L123
>
> for more context, the kinesis implementation above uses the Kinesis Client
> Library (KCL) to automatically assign - and load balance - stream shards
> among all KCL threads from all receivers (potentially coming and going as
> nodes die) on all executors/nodes using DynamoDB as the association data
> store.
>
> ZooKeeper would be used for your Kafka consumers, of course.  and
> ZooKeeper watches to handle the ephemeral nodes.  and I see you're using
> Curator, which makes things easier.
>
> as bharat suggested, running multiple receivers/dstreams may be desirable
> from a scalability and fault tolerance standpoint.  is this type of load
> balancing possible among your different Kafka consumers running in
> different ephemeral JVMs?
>
> and isn't it fun proposing a popular piece of code?  the question
> floodgates have opened!  haha. :)
>
> -chris
>
>
>
> On Tue, Aug 26, 2014 at 7:29 AM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi Bharat,
>>
>> Thanks for your email. If the "Kafka Reader" worker process dies, it will
>> be replaced by different machine, and it will start consuming from the
>> offset where it left over ( for each partition). Same case can happen even
>> if I tried to have individual Receiver for every partition.
>>
>> Regards,
>> Dibyendu
>>
>>
>> On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat 
>> wrote:
>>
>>> I like this consumer for what it promises - better control over offset
>>> and
>>> recovery from failures.  If I understand this right, it still uses single
>>> worker process to read from Kafka (one thread per partition) - is there a
>>> way to specify multiple worker processes (on different machines) to read
>>> from Kafka?  Maybe one worker process for each partition?
>>>
>>> If there is no such option, what happens when the single machine hosting
>>> the
>>> "Kafka Reader" worker process dies and is replaced by a different machine
>>> (like in cloud)?
>>>
>>> Thanks,
>>> Bharat
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Dibyendu Bhattacharya
Hi Bharat,

Thanks for your email. If the "Kafka Reader" worker process dies, it will
be replaced by different machine, and it will start consuming from the
offset where it left over ( for each partition). Same case can happen even
if I tried to have individual Receiver for every partition.

Regards,
Dibyendu


On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat 
wrote:

> I like this consumer for what it promises - better control over offset and
> recovery from failures.  If I understand this right, it still uses single
> worker process to read from Kafka (one thread per partition) - is there a
> way to specify multiple worker processes (on different machines) to read
> from Kafka?  Maybe one worker process for each partition?
>
> If there is no such option, what happens when the single machine hosting
> the
> "Kafka Reader" worker process dies and is replaced by a different machine
> (like in cloud)?
>
> Thanks,
> Bharat
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Dibyendu Bhattacharya
Hi,

As I understand, your problem is similar to this JIRA.

https://issues.apache.org/jira/browse/SPARK-1647

The issue in this case, Kafka can not replay the message as offsets are
already committed. Also I think existing KafkaUtils ( The Default High
Level Kafka Consumer) also have this issue.

Similar discussion is there in this thread also...

http://apache-spark-user-list.1001560.n3.nabble.com/Data-loss-Spark-streaming-and-network-receiver-td12337.html

As I am thinking, it is possible to tackle this in the consumer code I have
written. If we can store the topic partition_id and consumed offset in ZK
after every checkpoint , then after Spark recover from the fail over, the
present PartitionManager code can start reading from last checkpointed
offset ( instead last committed offset as it is doing now) ..In that case
it can replay the data since last checkpoint.

I will think over it ..

Regards,
Dibyendu



On Mon, Aug 25, 2014 at 11:23 PM, RodrigoB 
wrote:

> Hi Dibyendu,
>
> My colleague has taken a look at the spark kafka consumer github you have
> provided and started experimenting.
>
> We found that somehow when Spark has a failure after a data checkpoint, the
> expected re-computations correspondent to the metadata checkpoints are not
> recovered so we loose Kafka messages and RDD's computations in Spark.
> The impression is that this code is replacing quite a bit of Spark Kafka
> Streaming code where maybe (not sure) metadata checkpoints are done every
> batch interval.
>
> Was it on purpose to solely depend on the Kafka commit to recover data and
> recomputations between data checkpoints? If so, how to make this work?
>
> tnks
> Rod
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Data loss - Spark streaming and network receiver

2014-08-18 Thread Dibyendu Bhattacharya
Dear All,

Recently I have written a Spark Kafka Consumer to solve this problem. Even
we have seen issues with KafkaUtils which is using Highlevel Kafka Consumer
and consumer code has no handle to offset management.

The below code solves this problem, and this has is being tested in our
Spark Cluster and this working fine as of now.

https://github.com/dibbhatt/kafka-spark-consumer

This is Low Level Kafka Consumer using Kafka Simple Consumer API.

Please have a look at it and let me know your opinion. This has been
written to eliminate the Data loss by committing the offset after it is
written to BM. Also existing HighLevel KafkaUtils does not have any feature
to control Data Flow, and is gives Out Of Memory error is there is too much
backlogs in Kafka. This consumer solves this problem as well.  And this
code has been modified from earlier Storm Kafka consumer code and it has
lot of other features like recovery from Kafka node failures, ZK failures,
recover from Offset errors etc.

Regards,
Dibyendu


On Tue, Aug 19, 2014 at 9:49 AM, Shao, Saisai  wrote:

>  I think Currently Spark Streaming lack a data acknowledging mechanism
> when data is stored and replicated in BlockManager, so potentially data
> will be lost even pulled into Kafka, say if data is stored just in
> BlockGenerator not BM, while in the meantime Kafka itself commit the
> consumer offset, also at this point node is failed, from Kafka’s point this
> part of data is feed into Spark Streaming but actually this data is not yet
> processed, so potentially this part of data will never be processed again,
> unless you read the whole partition again.
>
>
>
> To solve this potential data loss problem, Spark Streaming needs to offer
> a data acknowledging mechanism, so custom Receiver can use this
> acknowledgement to do checkpoint or recovery, like Storm.
>
>
>
> Besides, driver failure is another story need to be carefully considered.
> So currently it is hard to make sure no data loss in Spark Streaming, still
> need to improve at some points J.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Tobias Pfeiffer [mailto:t...@preferred.jp]
> *Sent:* Tuesday, August 19, 2014 10:47 AM
> *To:* Wei Liu
> *Cc:* user
> *Subject:* Re: Data loss - Spark streaming and network receiver
>
>
>
> Hi Wei,
>
>
>
> On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu 
> wrote:
>
> Since our application cannot tolerate losing customer data, I am wondering
> what is the best way for us to address this issue.
>
> 1) We are thinking writing application specific logic to address the data
> loss. To us, the problem seems to be caused by that Kinesis receivers
> advanced their checkpoint before we know for sure the data is replicated.
> For example, we can do another checkpoint ourselves to remember the kinesis
> sequence number for data that has been processed by spark streaming. When
> Kinesis receiver is restarted due to worker failures, we restarted it from
> the checkpoint we tracked.
>
>
>
> This sounds pretty much to me like the way Kafka does it. So, I am not
> saying that the stock KafkaReceiver does what you want (it may or may not),
> but it should be possible to update the "offset" (corresponds to "sequence
> number") in Zookeeper only after data has been replicated successfully. I
> guess "replace Kinesis by Kafka" is not in option for you, but you may
> consider pulling Kinesis data into Kafka before processing with Spark?
>
>
>
> Tobias
>
>
>


Re: Low Level Kafka Consumer for Spark

2014-08-05 Thread Dibyendu Bhattacharya
Thanks Jonathan,

Yes, till non-ZK based offset management is available in Kafka, I need to
maintain the offset in ZK. And yes, both cases explicit commit is
necessary. I modified the Low Level Kafka Spark Consumer little bit to have
Receiver spawns threads for every partition of the topic and perform the
'store' operation in multiple threads. It would be good if the
receiver.store methods are made thread safe..which is not now presently .

Waiting for TD's comment on this Kafka Spark Low Level consumer.


Regards,
Dibyendu



On Tue, Aug 5, 2014 at 5:32 AM, Jonathan Hodges  wrote:

> Hi Yan,
>
> That is a good suggestion.  I believe non-Zookeeper offset management will
> be a feature in the upcoming Kafka 0.8.2 release tentatively scheduled for
> September.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management
>
> That should make this fairly easy to implement, but it will still require
> explicit offset commits to avoid data loss which is different than the
> current KafkaUtils implementation.
>
> Jonathan
>
>
>
>
>
> On Mon, Aug 4, 2014 at 4:51 PM, Yan Fang  wrote:
>
>> Another suggestion that may help is that, you can consider use Kafka to
>> store the latest offset instead of Zookeeper. There are at least two
>> benefits: 1) lower the workload of ZK 2) support replay from certain
>> offset. This is how Samza <http://samza.incubator.apache.org/> deals
>> with the Kafka offset, the doc is here
>> <http://samza.incubator.apache.org/learn/documentation/0.7.0/container/checkpointing.html>
>>  .
>> Thank you.
>>
>> Cheers,
>>
>> Fang, Yan
>> yanfang...@gmail.com
>> +1 (206) 849-4108
>>
>>
>> On Sun, Aug 3, 2014 at 8:59 PM, Patrick Wendell 
>> wrote:
>>
>>> I'll let TD chime on on this one, but I'm guessing this would be a
>>> welcome addition. It's great to see community effort on adding new
>>> streams/receivers, adding a Java API for receivers was something we did
>>> specifically to allow this :)
>>>
>>> - Patrick
>>>
>>>
>>> On Sat, Aug 2, 2014 at 10:09 AM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have implemented a Low Level Kafka Consumer for Spark Streaming using
>>>> Kafka Simple Consumer API. This API will give better control over the Kafka
>>>> offset management and recovery from failures. As the present Spark
>>>> KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better
>>>> control over the offset management which is not possible in Kafka HighLevel
>>>> consumer.
>>>>
>>>> This Project is available in below Repo :
>>>>
>>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>>
>>>>
>>>> I have implemented a Custom Receiver
>>>> consumer.kafka.client.KafkaReceiver. The KafkaReceiver uses low level Kafka
>>>> Consumer API (implemented in consumer.kafka packages) to fetch messages
>>>> from Kafka and 'store' it in Spark.
>>>>
>>>> The logic will detect number of partitions for a topic and spawn that
>>>> many threads (Individual instances of Consumers). Kafka Consumer uses
>>>> Zookeeper for storing the latest offset for individual partitions, which
>>>> will help to recover in case of failure. The Kafka Consumer logic is
>>>> tolerant to ZK Failures, Kafka Leader of Partition changes, Kafka broker
>>>> failures,  recovery from offset errors and other fail-over aspects.
>>>>
>>>> The consumer.kafka.client.Consumer is the sample Consumer which uses
>>>> this Kafka Receivers to generate DStreams from Kafka and apply a Output
>>>> operation for every messages of the RDD.
>>>>
>>>> We are planning to use this Kafka Spark Consumer to perform Near Real
>>>> Time Indexing of Kafka Messages to target Search Cluster and also Near Real
>>>> Time Aggregation using target NoSQL storage.
>>>>
>>>> Kindly let me know your view. Also if this looks good, can I contribute
>>>> to Spark Streaming project.
>>>>
>>>> Regards,
>>>> Dibyendu
>>>>
>>>
>>>
>>
>


Re: Spark stream data from kafka topics and output as parquet file on HDFS

2014-08-05 Thread Dibyendu Bhattacharya
You can try this Kafka Spark Consumer which I recently wrote. This uses the
Low Level Kafka Consumer

https://github.com/dibbhatt/kafka-spark-consumer

Dibyendu




On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s  wrote:

> Hi,
>
> I am new to Apache Spark and Trying to Develop spark streaming program to  
> *stream
> data from kafka topics and output as parquet file on HDFS*.
>
> Please share the *sample reference* program to stream data from kafka
> topics and output as parquet file on HDFS.
>
> Thanks in Advance.
>
> Regards,
>
> Rafeeq S
> *(“What you do is what matters, not what you think or say or plan.” )*
>
>


Low Level Kafka Consumer for Spark

2014-08-02 Thread Dibyendu Bhattacharya
Hi,

I have implemented a Low Level Kafka Consumer for Spark Streaming using
Kafka Simple Consumer API. This API will give better control over the Kafka
offset management and recovery from failures. As the present Spark
KafkaUtils uses HighLevel Kafka Consumer API, I wanted to have a better
control over the offset management which is not possible in Kafka HighLevel
consumer.

This Project is available in below Repo :

https://github.com/dibbhatt/kafka-spark-consumer


I have implemented a Custom Receiver consumer.kafka.client.KafkaReceiver.
The KafkaReceiver uses low level Kafka Consumer API (implemented in
consumer.kafka packages) to fetch messages from Kafka and 'store' it in
Spark.

The logic will detect number of partitions for a topic and spawn that many
threads (Individual instances of Consumers). Kafka Consumer uses Zookeeper
for storing the latest offset for individual partitions, which will help to
recover in case of failure. The Kafka Consumer logic is tolerant to ZK
Failures, Kafka Leader of Partition changes, Kafka broker failures,
 recovery from offset errors and other fail-over aspects.

The consumer.kafka.client.Consumer is the sample Consumer which uses this
Kafka Receivers to generate DStreams from Kafka and apply a Output
operation for every messages of the RDD.

We are planning to use this Kafka Spark Consumer to perform Near Real Time
Indexing of Kafka Messages to target Search Cluster and also Near Real Time
Aggregation using target NoSQL storage.

Kindly let me know your view. Also if this looks good, can I contribute to
Spark Streaming project.

Regards,
Dibyendu