You have access to the offset ranges for a given rdd in the stream by
typecasting to HasOffsetRanges. You can then store the offsets wherever
you need to.
On Tue, Jul 14, 2015 at 5:00 PM, Chen Song chen.song...@gmail.com wrote:
A follow up question.
When using createDirectStream approach,
A follow up question.
When using createDirectStream approach, the offsets are checkpointed to
HDFS and it is understandable by Spark Streaming job. Is there a way to
expose the offsets via a REST api to end users. Or alternatively, is there
a way to have offsets committed to Kafka Offset Manager
Of course, exactly once receiving is not same as exactly once. In case of
direct kafka stream, the data may actually be pulled multiple time. But
even if the data of a batch is pulled twice because of some failure, the
final result (that is, transformed data accessed through foreachRDD) will
Thanks TD and Cody. I saw that.
1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets on
HDFS at the end of each batch interval?
2. In the code, if I first apply transformations and actions on the
directKafkaStream and then use foreachRDD on the original KafkaDStream to
commit
Thanks TD.
As for 1), if timing is not guaranteed, how does exactly once semantics
supported? It feels like exactly once receiving is not necessarily exactly
once processing.
Chen
On Tue, Jul 14, 2015 at 10:16 PM, Tathagata Das t...@databricks.com wrote:
On Tue, Jul 14, 2015 at 6:42 PM,
Relevant documentation -
https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
towards the end.
directKafkaStream.foreachRDD { rdd =
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
// offsetRanges.length = # of Kafka partitions being consumed
...
}
On Tue,
On Tue, Jul 14, 2015 at 6:42 PM, Chen Song chen.song...@gmail.com wrote:
Thanks TD and Cody. I saw that.
1. By doing that (foreachRDD), does KafkaDStream checkpoints its offsets
on HDFS at the end of each batch interval?
The timing is not guaranteed.
2. In the code, if I first apply
Is this 3 is no of parallel consumer threads per receiver , means in total
we have 2*3=6 consumer in same consumer group consuming from all 300
partitions.
3 is just parallelism on same receiver and recommendation is to use 1 per
receiver since consuming from kafka is not cpu bound rather
You can't use different versions of spark in your application vs your
cluster.
For the direct stream, it's not 60 partitions per executor, it's 300
partitions, and executors work on them as they are scheduled. Yes, if you
have no messages you will get an empty partition. It's up to you whether
3. You need to use your own method, because you need to set up your job.
Read the checkpoint documentation.
4. Yes, if you want to checkpoint, you need to specify a url to store the
checkpoint at (s3 or hdfs). Yes, for the direct stream checkpoint it's
just offsets, not all the messages.
On
Hi
Let me take ashot at your questions. (I am sure people like Cody and TD
will correct if I am wrong)
0. This is exact copy from the similar question in mail thread from Akhil D:
Since you set local[4] you will have 4 threads for your computation, and
since you are having 2 receivers, you are
1. Here you are basically creating 2 receivers and asking each of them to
consume 3 kafka partitions each.
- In 1.2 we have high level consumers so how can we restrict no of kafka
partitions to consume from? Say I have 300 kafka partitions in kafka topic
and as in above I gave 2 receivers and 3
Few doubts :
In 1.2 streaming when I use union of streams , my streaming application
getting hanged sometimes and nothing gets printed on driver.
[Stage 2:
(0 + 2) / 2]
Whats is 0+2/2 here signifies.
1.Does no of streams in
In the receiver based approach, If the receiver crashes for any reason
(receiver crashed or executor crashed) the receiver should get restarted on
another executor and should start reading data from the offset present in
the zookeeper. There is some chance of data loss which can alleviated using
Hi,
There is another option to try for Receiver Based Low Level Kafka Consumer
which is part of Spark-Packages (
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This can
be used with WAL as well for end to end zero data loss.
This is also Reliable Receiver and Commit offset to
The receiver-based kafka createStream in spark 1.2 uses zookeeper to store
offsets. If you want finer-grained control over offsets, you can update
the values in zookeeper yourself before starting the job.
createDirectStream in spark 1.3 is still marked as experimental, and
subject to change.
Read the spark streaming guide ad the kafka integration guide for a better
understanding of how the receiver based stream works.
Capacity planning is specific to your environment and what the job is
actually doing, youll need to determine it empirically.
On Friday, June 26, 2015, Shushant Arora
In 1.2 how to handle offset management after stream application starts in
each job . I should commit offset after job completion manually?
And what is recommended no of consumer threads. Say I have 300 partitions
in kafka cluster . Load is ~ 1 million events per second.Each event is of
~500bytes.
I am using spark streaming 1.2.
If processing executors get crashed will receiver rest the offset back to
last processed offset?
If receiver itself got crashed is there a way to reset the offset without
restarting streaming application other than smallest or largest.
Is spark streaming 1.3
19 matches
Mail list logo