[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254290#comment-15254290
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-213522367
  
@jianbzhou done!  Please see https://github.com/apache/storm/pull/1357. 
Thanks once again.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15275946#comment-15275946
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-217776015
  
Hi Hmcl,

During our testing we found sometime the poll method was not called for 
long time, I suspect it is caused by below condition:

private boolean poll() {
return !waitingToEmit() && **numUncommittedOffsets < 
kafkaSpoutConfig.getMaxUncommittedOffsets()**;
}

I found numUncommittedOffsets will be incremented in either of the below 
situation:
1.  (!retryService.isScheduled(msgId) – this is the most common situation – 
one message was polled and it is not in the toRetryMsg – it is a normal emit 
instead of a retry;
2.  retryService.isReady(msgId) – this means the message was emitted 
previously – now will be re-emitted as per the retry logic.

As per below logic – for one message 50, in the first time it was polled 
and emiited, numUncommittedOffsets will be incremented by 1, then this message 
failed and retried for 10 times, so totally numUncommittedOffsets will be 
increamented by 11.


private void emitTupleIfNotEmitted(ConsumerRecord record) {

else if (!retryService.isScheduled(msgId) || 
retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never 
emitted) or ready to be retried
final List tuple = tuplesBuilder.buildTuple(record);
kafkaSpoutStreams.emit(collector, tuple, msgId);
emitted.add(msgId);
**numUncommittedOffsets++;**

}

But as per below logic – after successful commit, numUncommittedOffsets 
will subtract the actual number of message that got commited. If it commit one 
message 50, then will only substract 1 instead of 11.

public void commit(OffsetAndMetadata committedOffset) {

**numUncommittedOffsets-= numCommittedOffsets;**

}

Under some circumstances – say a rebalance happened and we seek back to a 
very small/early offset, seems this would cause emitTupleIfNotEmitted have a 
quite big number – finally this will be greater than 
kafkaSpoutConfig.getMaxUncommittedOffsets, and got poll() method not be called.


I am not sure if I corrrectly understand your code or miss anything – could 
you please kindly help confirm if above situtaion is possible or not?

Please feel free to let me know if you need any further info and thanks for 
your help.



> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15275980#comment-15275980
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-217781379
  
Just think another scenario – for example we polled 10k message and emitted 
– say from 10001 to 2, so the numUncommittedOffsets is 1, if the 10500 
msg failed, which caused all the following message 10501~2 will not got 
commited until the 10500 message was reemitted and acked.
If a rebalance happened during this time, the offset will seek back to the 
last commited offiset +1, possibly will seek back to offset 10001, then all the 
message from 10001 to 2000 will be polled and emitted again – which will cause 
numUncommittedOffsets be incremented by another 1 again.
After successfully commit to kafka, numUncommittedOffsets will substract 
1 and leave 1 value there.
Seems this will also gradually cause numUncommittedOffsets be a bigger 
value than we expect, Is this a possible scenario?



> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276481#comment-15276481
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-217894464
  
@jianbzhou thanks for your feedback. Let me take a look at this and I will 
get back to you shortly.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276541#comment-15276541
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-217905885
  
thanks Hmcl.

Just found below log constantly show up, seems it constantly try to commit 
one offset which is actually committed to kafka already – it might be caused by 
group rebalance – so a smaller offset (smaller than the committed offset) is 
acked back lately.

For example(it is our assumption, kindly correct me if wrong): one consumer 
commit offset 1000, polled 1001~1050 messages and emitted, also message was 
acked for 1001 ~ 1009, then a rebalance happened, another consumer poll message 
from 1000 to 1025, and commit the offset to 1010, then the message 1010(was 
emitted before the rebalance) was acked back. This will cause 1010 will never 
be committed as per the logic in findNextCommitOffset method – because this 
offset was already commited to kafka successfully.

Log is:
2016-05-09 03:02:14 io.ebay.rheos.KafkaSpout [INFO] Unexpected offset found 
[37137]. OffsetEntry{topic-partition=oradb.core4-lcr.caty.ebay-bids-3, 
fetchOffset=37138, committedOffset=37137, 
ackedMsgs={topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137, 
numFails=0}|{topic-partition=oradb.core4-lcr.caty.ebay-bids-3, offset=37137, 
numFails=0}} 

We applied below fix - For OffsetEntry.add(KafkaSpoutMessageId msgId) 
method, we changed the code as per below – only add acked message when its 
offset is bigger than the committed offset.

public void add(KafkaSpoutMessageId msgId) {  // O(Log N)
**_if(msgId.offset() > committedOffset)//this line is newly added_**
ackedMsgs.add(msgId);
}

Could you please help take a look at the above and let me know your 
thoughts? Thanks.



> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15280075#comment-15280075
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-218453241
  
HI Hcml, 
Just fyi - 
In method doSeekRetriableTopicPartitions, we find below code:
**kafkaConsumer.seekToEnd(rtp);// Seek to last committed offset**
Above code is contradictory with the comments, we replaced above line to 
below:
**OffsetAndMetadata commitOffset = kafkaConsumer.committed(rtp);
kafkaConsumer.seek(rtp, commitOffset.offset());// Seek to last 
committed offset**
Any comments please let me know. Thanks!

For all above identified issues, we applied some quick and dirty fix and 
the testing is in progress, we will let you know the final testing result later.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15291212#comment-15291212
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-220347788
  
@jianbzhou thanks once again for your feedback. Can you please share the 
results of your tests? I am working on adding more tests to this, as well as 
fix some of these corner cases, possibly using some of your suggested 
solutions, if they are the most suitable.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15295865#comment-15295865
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-220878934
  
@hmcl sorry for the late reply. we have made some quick and dirty fix for 
above issues, I will share the new spout to you via email so you can do a quick 
comparison. Now it seems working for our project. Please help review and let us 
know your comments/concern on the fix.

One customer of us who also use the spout they found some other issues:
1.  Work load is not distributed to all spout tasks(as per the storm 
topology) 
2.  No progress on some partitions (as per the log)
3.  No commit on some partitions (as per the log)
I will start looking into that tomorrow once I get the detail log file. 
Also if you have any clue on this please kindly advise.



> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15297491#comment-15297491
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-221143192
  
@jianbzhou can you please email me what you have such that we can provide 
with a fix? Is there a way you can share the kafka setup that causes the issues 
that you mention ?

I should upload my test cases later today, and that should help us address 
any possible issues. Thanks.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15297572#comment-15297572
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-221153889
  
@hmcl, could you share your email address? I will send our latest spout so 
you can have a quick review - this version is working in our testing env for 
about a week. Our customer faced one issue which seems that  the load is not 
well distributed across all partition in 0.9 KafkaSpout, some partitions have 
no commit, progress...I am still waiting for the kafka setup from the customer 
and shall send to you once i have.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15297640#comment-15297640
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-221161594
  
@jianbzhou please email me to


![image](https://cloud.githubusercontent.com/assets/10284328/15491930/97f1a368-212a-11e6-8dd0-da092f040818.png)



> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15299409#comment-15299409
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-221465746
  
@hmcl , sorry for the late reply, i was on leave and just now i send the 
updated spout to you, pls help review. Below is the major changes:
1. In poll method, change numUncommittedOffsets < 
kafkaSpoutConfig.getMaxUncommittedOffsets()
to emitted.size() < kafkaSpoutConfig.getMaxUncommittedOffsets();
2. In method doSeekRetriableTopicPartitions, seems your code is 
contradicted with the comment, i changed 
**from:**
else {
  kafkaConsumer.seekToEnd(rtp);// Seek to last committed offset
 }
**To:** 
  else {
//  kafkaConsumer.seekToEnd(rtp);// Seek to last 
committed offset
OffsetAndMetadata commitOffset = 
kafkaConsumer.committed(rtp);
kafkaConsumer.seek(rtp, commitOffset.offset());  // 
Seek to last committed offset
}

3. in ack method, we found acked.get(msgId.getTopicPartition()) might 
return null so we add some defensive validation - possibly due to kafka 
consumer rebalance, the partition doesn't belongs to this spout anymore
4. in OffsetEntry.add method, we add one condition, only add the message 
when condition is met - if (msgId.offset() > committedOffset). This 
change was also applied in method doSeekRetriableTopicPartitions.





> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15302339#comment-15302339
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-221922582
  
@hmcl, currently if user give firstPollOffsetStrategy=UNCOMMITTED_LATEST or 
LATEST, the spout will not work, because if a kafka consumer re-balance 
happened, the offset will be seeked to the end, and there will be lots of 
messages not consumed/emitted/acked&failed, so will never find the next 
continuous offset to commit, so the log will keep showing that "Non continuous 
offset found"..

I have a questions here - if a spout read and emit one message, I assume 
storm will ensure the message will be acked or failed without exception, right? 
because if it is possible that one emitted message failed to get acked or 
failed message under some strange situations, it means we cannot find the 
continuous message to commit, which will directly break the spout. Could you 
please help confirm if my assumption is correct?

If my assumption is not correct - which means one emitted message may not 
be able to get acked or failed message back, then we must change the spout(need 
a timeout setting if failed to find next continuous message to commit) - 
currently the spout will always find the next continuous message to commit, it 
will try forever...

due to the spout will always find the next continuous message to commit, we 
need to be cautious for below method:
private boolean poll() {
return !waitingToEmit() && emitted.size() < 
kafkaSpoutConfig.getMaxUncommittedOffsets();
}
if the MaxUncommittedOffsets is too small, the spout will frequently stop 
polling from kafka, if a rebalance happened and seek back to the failed 
message, at this moment if the spout stop polling, will also cause the spout 
failed to find the next committed message. Currently we set this value to 
20 and seems working fine for now.
Looking forward to hearing from you! thanks!


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15302400#comment-15302400
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-221928986
  
@jianbzhou looking at it.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15305021#comment-15305021
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-78070
  
@jianbzhou I confirm that your suggested fix for 
doSeekRetriableTopicPartitions is correct. I am going to include that in the 
next patch.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15305027#comment-15305027
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user connieyang commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-78362
  
Following the Trident API support for the new KafkaSpout implementation... 
Is anyone working on this?  Thanks.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15305031#comment-15305031
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-78660
  
@connieyang I am finishing addressing some issues brought up by the initial 
users of this kafka spout, as well as unit test coverage, and will push the 
trident API right after.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15305051#comment-15305051
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-80056
  
@jianbzhou any updates on 

``` One customer of us who also use the spout they found some other issues:
1. Work load is not distributed to all spout tasks(as per the storm 
topology) 
2. No progress on some partitions (as per the log)
3. No commit on some partitions (as per the log)
I will start looking into that tomorrow once I get the detail log file. 
Also if you have any clue on this please kindly advise.```


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15305342#comment-15305342
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the pull request:

https://github.com/apache/storm/pull/1131#issuecomment-222308004
  
@hmcl, 1, work load is not distributed well is not because of the spout, 
that is a kafka cluster setup issue and now is resolved 2, for the other two, I 
dig into the log(sent to your via email) - seems everytime when a re-balance 
happens, the spout seek to a bigger offset than the committed offset in this 
partition, per my understanding, this will cause some message not be able to 
consumed/emitted, so all the log show "Non continuous offset found"
user spout setting is: firstPollOffsetStrategy=UNCOMMITTED_LATEST, 
pollTimeoutMs=2000, offsetCommitPeriodMs=1, maxRetries=2147483647
I know firstPollOffsetStrategy cannot be EARLIEST or LATEST, but seems to 
me UNCOMMITTED_LATEST should not cause this issue.
I asked user to try UNCOMMITTED_EARLIEST and now seems the issue does NOT 
happen again as per the log, though it may happen later...
From the code perspective, i cannot understand why the weird behavior 
happened, could you help?

Also, per our previous testing - we find once - a worker died and re 
balance happened, we find one spout(not in the died worker) have some message 
not acked or failed back. That also caused the "Non continuous offset found" 
show many times in the log,  which will cause no message will be committed to 
kafka. The only solution will be restart the storm topology.

We emit message in this way - kafkaSpoutStreams.emit(collector, tuple, 
msgId); Could you please help confirm - **storm would ensure  all the messages 
that emitted by the spout will be acked/failed back without exception?** 
Because if this is not the case, the spout will not be able to find the 
continuous offset to commit, then we must fix this issue urgently as we plan to 
release the change early next month. Please help advise. thanks!


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15308034#comment-15308034
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on the pull request:

https://github.com/apache/storm/pull/1131
  
@jianbzhou **Storm** guarantees that all the messages are either acked or 
failed. There is the property "topology.message.timeout.secs" 
https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/Config.java#L1669

If Storm is configured to use acks, and the acks don't arrive in a certain 
amount of time, the tuple will be retired. You don't have to worry about the 
scenario you described, and basically implement the timeout yourself.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15312301#comment-15312301
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the issue:

https://github.com/apache/storm/pull/1131
  
@hmcl, today we found one NullpointerException and i applied a fix as below:

In method doSeekRetriableTopicPartitions, if one partition was never 
committed before one message is failed back, we will encounter below issue. 
Could you please help review and let me know if this fix is oaky?

**Code change is as below -from**

//  kafkaConsumer.seekToEnd(rtp);// Seek to last committed offset
OffsetAndMetadata commitOffset = kafkaConsumer.committed(rtp);
kafkaConsumer.seek(rtp, **commitOffset.offset()**);  // Seek to last 
committed offset

**to**
//  kafkaConsumer.seekToEnd(rtp);// Seek to last committed offset
OffsetAndMetadata commitOffset = kafkaConsumer.committed(rtp);
if(commitOffset != null){
kafkaConsumer.seek(rtp, commitOffset.offset());  // Seek to last 
committed offset
} else{
LOG.info("In doSeekRetriableTopicPartitions, topic partition is {}, no 
offset was committed for this partition, will seek back to the initial offset 
{}", rtp, offsetEntry.initialFetchOffset);
kafkaConsumer.seek(rtp, offsetEntry.initialFetchOffset); //no offset 
committed for this partition, seek to the original fetch offset
}



> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15312603#comment-15312603
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1131
  
@jianbzhou thanks. Looking at it.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-06-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15335876#comment-15335876
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the issue:

https://github.com/apache/storm/pull/1131
  
@hmcl, just fyi, we found one new issue:
when rebalance happened, in method 
onPartitionsAssigned->initialize(Collection partitions)
we can see below two lines:
**final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
final long fetchOffset = doSeek(tp, committedOffset);**

If the committedOffset is out of range(say kafka log file removed), the 
when the poll() method is called, the offset will reset as the property 
auto.offset.reset. This will cause the newly polled message has bigger offset, 
so there is a break between committed offset and the acked offset, no 
continuous offset will be found.
We will apply a quick fix for this.


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) Kafka Spout New Consumer API

2016-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15425899#comment-15425899
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user jianbzhou commented on the issue:

https://github.com/apache/storm/pull/1131
  
@hmcl and all, the new spout is fit for the at least once semantics and 
works fine for us, thanks a lot! Very recently one of our key customers asked 
to use a at most once implementation. Do we have any plan to have a 
at-most-once implementation? They set the topology.acker.executors=0 and found 
the spout is not working. Could you please help to evaluate - 1, will we 
implement this? 2. roughly how long time needed? Thanks

**Requirement from customer** - “topology.acker.executors” is a Storm 
parameter, which refers to at-least-once when it’s not 0, and at-most-once if 
it’s 0. We want to know do we have a at-most-once implementation?


> Kafka Spout New Consumer API
> 
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
> Fix For: 1.0.0, 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)