kafka offset commit in spark streaming 1.2

2015-07-06 Thread Shushant Arora
In spark streaming 1.2 , Is offset of kafka message consumed are updated in
zookeeper only after writing in WAL if WAL and checkpointig are enabled or
is it depends upon kafkaparams while initialing the kafkaDstream.


MapString,String kafkaParams = new HashMapString, String();
kafkaParams.put(zookeeper.connect,ip:2181);
kafkaParams.put(group.id, testgroup);
kafkaParams.put(zookeeper.session.timeout.ms, 1);
kafkaParams.put(autocommit.enable,true);
kafkaParams.put(zookeeper.sync.time.ms, 250);

 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
byte[].class,kafka.serializer.DefaultDecoder.class ,
kafka.serializer.DefaultDecoder.class,
kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));

Here since I have set autocommit.enable to true , does spark streaming will
ignore this and always call explicit commitOffset high level  consumer
connector or does it depends on parameter passed?

Since if it depends upon parameter and receiver calls explicit commit only
when autocommit is false, then I should override the default autocommit to
false from true while enabling WAL, since it may give duplicate in case of
failure if WAL is enabled and autocommit is true.


RE: kafka offset commit in spark streaming 1.2

2015-07-06 Thread Shao, Saisai
If you’re using WAL with Kafka, Spark Streaming will ignore this 
configuration(autocommit.enable) and explicitly call commitOffset to update 
offset to Kafka AFTER WAL is done.

No matter what you’re setting with autocommit.enable, internally Spark 
Streaming will set it to false to turn off autocommit mechanism.

Thanks
Jerry

From: Shushant Arora [mailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:11 PM
To: user
Subject: kafka offset commit in spark streaming 1.2

In spark streaming 1.2 , Is offset of kafka message consumed are updated in 
zookeeper only after writing in WAL if WAL and checkpointig are enabled or is 
it depends upon kafkaparams while initialing the kafkaDstream.


MapString,String kafkaParams = new HashMapString, String();
kafkaParams.put(zookeeper.connect,ip:2181);
kafkaParams.put(group.idhttp://group.id, testgroup);

kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms,
 1);
kafkaParams.put(autocommit.enable,true);

kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250);

 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, 
byte[].class,kafka.serializer.DefaultDecoder.class , 
kafka.serializer.DefaultDecoder.class,
kafkaParams, topicsMap, 
StorageLevel.MEMORY_ONLY()));


Here since I have set autocommit.enable to true , does spark streaming will 
ignore this and always call explicit commitOffset high level  consumer 
connector or does it depends on parameter passed?

Since if it depends upon parameter and receiver calls explicit commit only when 
autocommit is false, then I should override the default autocommit to false 
from true while enabling WAL, since it may give duplicate in case of failure if 
WAL is enabled and autocommit is true.


RE: kafka offset commit in spark streaming 1.2

2015-07-06 Thread Shao, Saisai
Please see the inline comments.

From: Shushant Arora [mailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:51 PM
To: Shao, Saisai
Cc: user
Subject: Re: kafka offset commit in spark streaming 1.2

So If WAL is disabled, how developer can commit offset explicitly in spark 
streaming app since we don't write code which will be executed in receiver ?

I think it is difficult for user to commit offset explicitly in receiver-based 
Spark Streaming Kafka API.

If you want to explicitly commit offset, you could try Spark Streaming Kafka 
direct API, which is newly added in Spark 1.3+, where you could manage the 
offsets yourself, it is implemented based on Kafka’s low-level API.

Plus since offset commitment is asynchronoous, is it possible -it may happen 
last offset is not commited yet and next stream batch started on receiver and 
it may get duplicate data ?

Yes, it is possible, so receiver based Spark Streaming Kafka API cannot 
guarantee no duplication and no data lost. If you enable WAL, no data lost can 
be guaranteed by still you will meet duplication. So the best way is to use 
Spark Streaming Kafka direct API with your own offset management to make sure 
exact-once.



On Mon, Jul 6, 2015 at 6:16 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
If you disable WAL, Spark Streaming itself will not manage any offset related 
things, is auto commit is enabled by true, Kafka itself will update offsets in 
a time-based way, if auto commit is disabled, no any part will call 
commitOffset, you need to call this API yourself.

Also Kafka’s offset commitment mechanism is actually a timer way, so it is 
asynchronized with replication.

From: Shushant Arora 
[mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:30 PM
To: Shao, Saisai
Cc: user
Subject: Re: kafka offset commit in spark streaming 1.2

And what if I disable WAL and use replication of receiver data using 
StorageLevel.MEMORY_ONLY2(). Will it commit offset after replicating the 
message or will it use autocommit.enable value. And if it uses this value what 
if autocommit.enable is set to false then when does receiver calls commitOffset?

On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
If you’re using WAL with Kafka, Spark Streaming will ignore this 
configuration(autocommit.enable) and explicitly call commitOffset to update 
offset to Kafka AFTER WAL is done.

No matter what you’re setting with autocommit.enable, internally Spark 
Streaming will set it to false to turn off autocommit mechanism.

Thanks
Jerry

From: Shushant Arora 
[mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:11 PM
To: user
Subject: kafka offset commit in spark streaming 1.2

In spark streaming 1.2 , Is offset of kafka message consumed are updated in 
zookeeper only after writing in WAL if WAL and checkpointig are enabled or is 
it depends upon kafkaparams while initialing the kafkaDstream.


MapString,String kafkaParams = new HashMapString, String();
kafkaParams.put(zookeeper.connect,ip:2181);
kafkaParams.put(group.idhttp://group.id, testgroup);

kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms,
 1);
kafkaParams.put(autocommit.enable,true);

kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250);

 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, 
byte[].class,kafka.serializer.DefaultDecoder.class , 
kafka.serializer.DefaultDecoder.class,
kafkaParams, topicsMap, 
StorageLevel.MEMORY_ONLY()));


Here since I have set autocommit.enable to true , does spark streaming will 
ignore this and always call explicit commitOffset high level  consumer 
connector or does it depends on parameter passed?

Since if it depends upon parameter and receiver calls explicit commit only when 
autocommit is false, then I should override the default autocommit to false 
from true while enabling WAL, since it may give duplicate in case of failure if 
WAL is enabled and autocommit is true.




RE: kafka offset commit in spark streaming 1.2

2015-07-06 Thread Shao, Saisai
If you disable WAL, Spark Streaming itself will not manage any offset related 
things, is auto commit is enabled by true, Kafka itself will update offsets in 
a time-based way, if auto commit is disabled, no any part will call 
commitOffset, you need to call this API yourself.

Also Kafka’s offset commitment mechanism is actually a timer way, so it is 
asynchronized with replication.

From: Shushant Arora [mailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:30 PM
To: Shao, Saisai
Cc: user
Subject: Re: kafka offset commit in spark streaming 1.2

And what if I disable WAL and use replication of receiver data using 
StorageLevel.MEMORY_ONLY2(). Will it commit offset after replicating the 
message or will it use autocommit.enable value. And if it uses this value what 
if autocommit.enable is set to false then when does receiver calls commitOffset?

On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
If you’re using WAL with Kafka, Spark Streaming will ignore this 
configuration(autocommit.enable) and explicitly call commitOffset to update 
offset to Kafka AFTER WAL is done.

No matter what you’re setting with autocommit.enable, internally Spark 
Streaming will set it to false to turn off autocommit mechanism.

Thanks
Jerry

From: Shushant Arora 
[mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:11 PM
To: user
Subject: kafka offset commit in spark streaming 1.2

In spark streaming 1.2 , Is offset of kafka message consumed are updated in 
zookeeper only after writing in WAL if WAL and checkpointig are enabled or is 
it depends upon kafkaparams while initialing the kafkaDstream.


MapString,String kafkaParams = new HashMapString, String();
kafkaParams.put(zookeeper.connect,ip:2181);
kafkaParams.put(group.idhttp://group.id, testgroup);

kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms,
 1);
kafkaParams.put(autocommit.enable,true);

kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250);

 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, 
byte[].class,kafka.serializer.DefaultDecoder.class , 
kafka.serializer.DefaultDecoder.class,
kafkaParams, topicsMap, 
StorageLevel.MEMORY_ONLY()));


Here since I have set autocommit.enable to true , does spark streaming will 
ignore this and always call explicit commitOffset high level  consumer 
connector or does it depends on parameter passed?

Since if it depends upon parameter and receiver calls explicit commit only when 
autocommit is false, then I should override the default autocommit to false 
from true while enabling WAL, since it may give duplicate in case of failure if 
WAL is enabled and autocommit is true.



Re: kafka offset commit in spark streaming 1.2

2015-07-06 Thread Shushant Arora
So If WAL is disabled, how developer can commit offset explicitly in spark
streaming app since we don't write code which will be executed in receiver
?

Plus since offset commitment is asynchronoous, is it possible -it may
happen last offset is not commited yet and next stream batch started on
receiver and it may get duplicate data ?

On Mon, Jul 6, 2015 at 6:16 PM, Shao, Saisai saisai.s...@intel.com wrote:

  If you disable WAL, Spark Streaming itself will not manage any offset
 related things, is auto commit is enabled by true, Kafka itself will update
 offsets in a time-based way, if auto commit is disabled, no any part will
 call commitOffset, you need to call this API yourself.



 Also Kafka’s offset commitment mechanism is actually a timer way, so it is
 asynchronized with replication.



 *From:* Shushant Arora [mailto:shushantaror...@gmail.com]
 *Sent:* Monday, July 6, 2015 8:30 PM
 *To:* Shao, Saisai
 *Cc:* user
 *Subject:* Re: kafka offset commit in spark streaming 1.2



 And what if I disable WAL and use replication of receiver data using 
 StorageLevel.MEMORY_ONLY2().
 Will it commit offset after replicating the message or will it use 
 autocommit.enable
 value. And if it uses this value what if autocommit.enable is set to
 false then when does receiver calls commitOffset?



 On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  If you’re using WAL with Kafka, Spark Streaming will ignore this
 configuration(autocommit.enable) and explicitly call commitOffset to
 update offset to Kafka AFTER WAL is done.



 No matter what you’re setting with autocommit.enable, internally Spark
 Streaming will set it to false to turn off autocommit mechanism.



 Thanks

 Jerry



 *From:* Shushant Arora [mailto:shushantaror...@gmail.com]
 *Sent:* Monday, July 6, 2015 8:11 PM
 *To:* user
 *Subject:* kafka offset commit in spark streaming 1.2



 In spark streaming 1.2 , Is offset of kafka message consumed are updated
 in zookeeper only after writing in WAL if WAL and checkpointig are enabled
 or is it depends upon kafkaparams while initialing the kafkaDstream.





 MapString,String kafkaParams = new HashMapString, String();

 kafkaParams.put(zookeeper.connect,ip:2181);

 kafkaParams.put(group.id, testgroup);

 kafkaParams.put(zookeeper.session.timeout.ms, 1);

 kafkaParams.put(autocommit.enable,true);

 kafkaParams.put(zookeeper.sync.time.ms, 250);



  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
 byte[].class,kafka.serializer.DefaultDecoder.class ,
 kafka.serializer.DefaultDecoder.class,

 kafkaParams, topicsMap,
 StorageLevel.MEMORY_ONLY()));





 Here since I have set autocommit.enable to true , does spark streaming
 will ignore this and always call explicit commitOffset high level  consumer
 connector or does it depends on parameter passed?



 Since if it depends upon parameter and receiver calls explicit commit only
 when autocommit is false, then I should override the default autocommit to
 false from true while enabling WAL, since it may give duplicate in case of
 failure if WAL is enabled and autocommit is true.