Re: [Discuss] Storm hdfs spout improvements

2017-02-15 Thread Sachin Pasalkar
Sure will do that.

Get Outlook for Android<https://aka.ms/ghei36>



From: Arun Iyer <ai...@hortonworks.com> on behalf of Arun Mahadevan 
<ar...@apache.org>
Sent: Wednesday, February 15, 2017 10:35:43 AM
To: dev@storm.apache.org
Cc: Anudeep Kumar; Bobby Evans
Subject: Re: [Discuss] Storm hdfs spout improvements

Can you please raise a pull request with your proposal? That way it will be 
easier to review and comment.

Thanks,
Arun


On 2/15/17, 9:04 AM, "Sachin Pasalkar" <sachin_pasal...@symantec.com> wrote:

>Can any one take a look at this? I have attached my code in JIRA.
>
>On 14/02/17, 7:38 AM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
>wrote:
>
>>I have created JIRA for this
>>https://issues.apache.org/jira/browse/STORM-2358.
>>For point 1:
>>
>>Its specific use case just to support why it needs to be public
>>
>>For point 2:
>>We are limiting code to be very specific to these 2 implementations we
>>should have generic implementation. I see there is another check-in
>>happening for ZippedTextFileReader.I have attached my code changes in
>>JIRA, please take a look, where you need to provide class.
>>
>>For point 3:
>>
>>Lets assume I have multiple topologies with different readers. So I
>>defined the a base topology class with HDFSSpout in it. Now I always needs
>>to pass the outputFields as separate array. This actually can be part of
>>every reader class as its very specific to it.
>>
>>Thanks,
>>Sachin
>>
>>On 14/02/17, 4:52 AM, "Roshan Naik" <ros...@hortonworks.com> wrote:
>>
>>>
>>>
>>>On 2/13/17, 12:14 PM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
>>>wrote:
>>>
>>>>I have attached updated source code of HDFSSpout for more reference. I
>>>>have updated respective classes (not attached)
>>>
>>>
>>>Don¹t see any attachment. Answers are below. Better to do this discussion
>>>on a JIRA.
>>>
>>>
>>>On 2/13/17, 8:32 AM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
>>>wrote:
>>>
>>>>Hi,
>>>>
>>>>I was looking at storm hdfs spout code in 1.x branch, I found below
>>>>improvements can be made in below code.
>>>>
>>>>  1.  Make org.apache.storm.hdfs.spout.AbstractFileReader as public so
>>>>that it can be used in generics.
>>>
>>>Java generics and making a class public are unrelated to my knowledge.
>>>But
>>>making it public sounds ok to me if its useful for "user defined² readers
>>>Š although it doesn¹t really have that much going on in it. For future
>>>built-in reader types it is immaterial as they can derive from it anyway
>>>just like the existing ones. HdfsSpout class itself doesn¹t care about
>>>the
>>>ŒAbstractFileReader¹ type. For that there is the ŒFileReader¹ interface.
>>>
>>>
>>>
>>>>  2.  org.apache.storm.hdfs.spout.HdfsSpout requires readerType as
>>>>String. It will be great to have class
>>>>readerType; So we will not use Class.forName at multiple places also it
>>>>will help in below point.
>>>
>>>The reason it is a string, is that, for built-in readers,  we wanted to
>>>support Œshort aliases¹ like Œtext¹ and Œseq¹ instead of FQCN..
>>>
>>>
>>>>  3.  HdfsSpout also needs to provide outFields which are declared as
>>>>constants in each reader(e.g.SequenceFileReader). We can have abstract
>>>>API AbstractFileReader in which return them to user to make it generic.
>>>
>>>
>>>These consts can¹t go into the AbstractFileReader as they are reader
>>>specific.
>>>
>>>They are there just for convenience.  Users can call withOutputFields()
>>>on
>>>the spout and set it to these predefined names or anything else.
>>>
>>>
>>>-Roshan
>>>
>>
>



Re: [GitHub] storm pull request #1939: STORM-1363: TridentKafkaState should handle null v...

2017-02-14 Thread Sachin Pasalkar
Please review if possible. Its very small change.

On 14/02/17, 3:58 PM, "pasalkarsachin1" <g...@git.apache.org> wrote:

>GitHub user pasalkarsachin1 opened a pull request:
>
>
>https://clicktime.symantec.com/a/1/DBEpMKi7S8ttDWQWBLAX77jjfwHUyy4Njl5cZ3H
>b8zI=?d=hjW3hU4fseqlJeekQsKUtQDmn-71nGoCmEGzTXJ7cVpl3Xhyc3GnsK37akkU6NuBPl
>pqnC9IyIIr2cmRHSEuxFgBZDez_zHyWRlstrsFpNFXaxWFVlmrYqwSUBKCDOIRL691ETp_6G_y
>axpHEDvjKXpDAKpQnQLZm6Sv6an3fYB4D61QZST5BCQBfxAlTFVaY8cliJ2lpd6g7m7-hHc9Me
>P6eGE_UjLIwX0g1QtRnEPoy7utwfYRsHl_iH5eOtjExULV8tKKFnZGnZI-q2ZQDnQcRZ-LsJfO
>t1NT_pkPjRGreRpCBQzNQ7wksgkYGpRhe_WYP3KMFdofrq19DmK0R5VsAPum4BjxhharZ2gziV
>WGO2PM_Dc63s4xX0p1eoIIlOWXI6learycQjJ1NKS0E0KDJ7PhcBAPzNjjZHGxLFuTnltKJ-bE
>7W59hXKaBHdzlHIUcuvC3uH_CMhgRyB3ohF6_ulMFFP4wngS=https%3A%2F%2Fgithub.co
>m%2Fapache%2Fstorm%2Fpull%2F1939
>
>STORM-1363: TridentKafkaState should handle null values from
>TridentTupleToKafkaMapper.getMessageFromTuple()
>
>In case null value comes from the mapper it will print warning
>messages.
>Added log to print the time taken to emit number of messages.
>
>You can merge this pull request into a Git repository by running:
>
>$ git pull 
>https://clicktime.symantec.com/a/1/SernQA8KSlmNZQjIPv0ZkiF9eDlGpTWJdhaUYHo
>t3DY=?d=hjW3hU4fseqlJeekQsKUtQDmn-71nGoCmEGzTXJ7cVpl3Xhyc3GnsK37akkU6NuBPl
>pqnC9IyIIr2cmRHSEuxFgBZDez_zHyWRlstrsFpNFXaxWFVlmrYqwSUBKCDOIRL691ETp_6G_y
>axpHEDvjKXpDAKpQnQLZm6Sv6an3fYB4D61QZST5BCQBfxAlTFVaY8cliJ2lpd6g7m7-hHc9Me
>P6eGE_UjLIwX0g1QtRnEPoy7utwfYRsHl_iH5eOtjExULV8tKKFnZGnZI-q2ZQDnQcRZ-LsJfO
>t1NT_pkPjRGreRpCBQzNQ7wksgkYGpRhe_WYP3KMFdofrq19DmK0R5VsAPum4BjxhharZ2gziV
>WGO2PM_Dc63s4xX0p1eoIIlOWXI6learycQjJ1NKS0E0KDJ7PhcBAPzNjjZHGxLFuTnltKJ-bE
>7W59hXKaBHdzlHIUcuvC3uH_CMhgRyB3ohF6_ulMFFP4wngS=https%3A%2F%2Fgithub.co
>m%2Fpasalkarsachin1%2Fstorm STORM-1363
>
>Alternatively you can review and apply these changes as the patch at:
>
>
>https://clicktime.symantec.com/a/1/xOUqJRyJnT_tV6kmFKnezykmhm7Nlr271eJ8nnz
>K7Lo=?d=hjW3hU4fseqlJeekQsKUtQDmn-71nGoCmEGzTXJ7cVpl3Xhyc3GnsK37akkU6NuBPl
>pqnC9IyIIr2cmRHSEuxFgBZDez_zHyWRlstrsFpNFXaxWFVlmrYqwSUBKCDOIRL691ETp_6G_y
>axpHEDvjKXpDAKpQnQLZm6Sv6an3fYB4D61QZST5BCQBfxAlTFVaY8cliJ2lpd6g7m7-hHc9Me
>P6eGE_UjLIwX0g1QtRnEPoy7utwfYRsHl_iH5eOtjExULV8tKKFnZGnZI-q2ZQDnQcRZ-LsJfO
>t1NT_pkPjRGreRpCBQzNQ7wksgkYGpRhe_WYP3KMFdofrq19DmK0R5VsAPum4BjxhharZ2gziV
>WGO2PM_Dc63s4xX0p1eoIIlOWXI6learycQjJ1NKS0E0KDJ7PhcBAPzNjjZHGxLFuTnltKJ-bE
>7W59hXKaBHdzlHIUcuvC3uH_CMhgRyB3ohF6_ulMFFP4wngS=https%3A%2F%2Fgithub.co
>m%2Fapache%2Fstorm%2Fpull%2F1939.patch
>
>To close this pull request, make a commit to your master/trunk branch
>with (at least) the following in the commit message:
>
>This closes #1939
>
>
>commit f78760c7ed94b14312d0eae1c7b5688c7eb4e96d
>Author: Sachin Pasalkar <sachin_pasal...@symantec.com>
>Date:   2017-02-14T10:24:23Z
>
>STORM-1363: TridentKafkaState should handle null values from
>TridentTupleToKafkaMapper.getMessageFromTuple()
>
>Incase null value comes from the mapper it will print warning
>messages also added the time take to emit number od messages in logs
>
>
>
>
>---
>If your project is set up for it, you can reply to this email and have
>your
>reply appear on GitHub as well. If your project does not have this feature
>enabled and wishes so, or if the feature is enabled but not working,
>please
>contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
>with INFRA.
>---



Re: [Discuss] Storm hdfs spout improvements

2017-02-14 Thread Sachin Pasalkar
Can any one take a look at this? I have attached my code in JIRA.

On 14/02/17, 7:38 AM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
wrote:

>I have created JIRA for this
>https://issues.apache.org/jira/browse/STORM-2358.
>For point 1:
>
>Its specific use case just to support why it needs to be public
>
>For point 2:
>We are limiting code to be very specific to these 2 implementations we
>should have generic implementation. I see there is another check-in
>happening for ZippedTextFileReader.I have attached my code changes in
>JIRA, please take a look, where you need to provide class.
>
>For point 3:
>
>Lets assume I have multiple topologies with different readers. So I
>defined the a base topology class with HDFSSpout in it. Now I always needs
>to pass the outputFields as separate array. This actually can be part of
>every reader class as its very specific to it.
>
>Thanks,
>Sachin
>
>On 14/02/17, 4:52 AM, "Roshan Naik" <ros...@hortonworks.com> wrote:
>
>>
>>
>>On 2/13/17, 12:14 PM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
>>wrote:
>>
>>>I have attached updated source code of HDFSSpout for more reference. I
>>>have updated respective classes (not attached)
>>
>>
>>Don¹t see any attachment. Answers are below. Better to do this discussion
>>on a JIRA.
>>
>>
>>On 2/13/17, 8:32 AM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
>>wrote:
>>
>>>Hi,
>>>
>>>I was looking at storm hdfs spout code in 1.x branch, I found below
>>>improvements can be made in below code.
>>>
>>>  1.  Make org.apache.storm.hdfs.spout.AbstractFileReader as public so
>>>that it can be used in generics.
>>
>>Java generics and making a class public are unrelated to my knowledge.
>>But
>>making it public sounds ok to me if its useful for "user defined² readers
>>Š although it doesn¹t really have that much going on in it. For future
>>built-in reader types it is immaterial as they can derive from it anyway
>>just like the existing ones. HdfsSpout class itself doesn¹t care about
>>the
>>ŒAbstractFileReader¹ type. For that there is the ŒFileReader¹ interface.
>>
>>
>>
>>>  2.  org.apache.storm.hdfs.spout.HdfsSpout requires readerType as
>>>String. It will be great to have class
>>>readerType; So we will not use Class.forName at multiple places also it
>>>will help in below point.
>>
>>The reason it is a string, is that, for built-in readers,  we wanted to
>>support Œshort aliases¹ like Œtext¹ and Œseq¹ instead of FQCN..
>>
>>
>>>  3.  HdfsSpout also needs to provide outFields which are declared as
>>>constants in each reader(e.g.SequenceFileReader). We can have abstract
>>>API AbstractFileReader in which return them to user to make it generic.
>>
>>
>>These consts can¹t go into the AbstractFileReader as they are reader
>>specific.
>>
>>They are there just for convenience.  Users can call withOutputFields()
>>on
>>the spout and set it to these predefined names or anything else.
>>
>>
>>-Roshan
>>
>



Re: [Discuss] Storm hdfs spout improvements

2017-02-13 Thread Sachin Pasalkar
I have created JIRA for this
https://issues.apache.org/jira/browse/STORM-2358.
For point 1:

Its specific use case just to support why it needs to be public

For point 2:
We are limiting code to be very specific to these 2 implementations we
should have generic implementation. I see there is another check-in
happening for ZippedTextFileReader.I have attached my code changes in
JIRA, please take a look, where you need to provide class.

For point 3:

Lets assume I have multiple topologies with different readers. So I
defined the a base topology class with HDFSSpout in it. Now I always needs
to pass the outputFields as separate array. This actually can be part of
every reader class as its very specific to it.

Thanks,
Sachin

On 14/02/17, 4:52 AM, "Roshan Naik" <ros...@hortonworks.com> wrote:

>
>
>On 2/13/17, 12:14 PM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
>wrote:
>
>>I have attached updated source code of HDFSSpout for more reference. I
>>have updated respective classes (not attached)
>
>
>Don¹t see any attachment. Answers are below. Better to do this discussion
>on a JIRA.
>
>
>On 2/13/17, 8:32 AM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
>wrote:
>
>>Hi,
>>
>>I was looking at storm hdfs spout code in 1.x branch, I found below
>>improvements can be made in below code.
>>
>>  1.  Make org.apache.storm.hdfs.spout.AbstractFileReader as public so
>>that it can be used in generics.
>
>Java generics and making a class public are unrelated to my knowledge. But
>making it public sounds ok to me if its useful for "user defined² readers
>Š although it doesn¹t really have that much going on in it. For future
>built-in reader types it is immaterial as they can derive from it anyway
>just like the existing ones. HdfsSpout class itself doesn¹t care about the
>ŒAbstractFileReader¹ type. For that there is the ŒFileReader¹ interface.
>
>
>
>>  2.  org.apache.storm.hdfs.spout.HdfsSpout requires readerType as
>>String. It will be great to have class
>>readerType; So we will not use Class.forName at multiple places also it
>>will help in below point.
>
>The reason it is a string, is that, for built-in readers,  we wanted to
>support Œshort aliases¹ like Œtext¹ and Œseq¹ instead of FQCN..
>
>
>>  3.  HdfsSpout also needs to provide outFields which are declared as
>>constants in each reader(e.g.SequenceFileReader). We can have abstract
>>API AbstractFileReader in which return them to user to make it generic.
>
>
>These consts can¹t go into the AbstractFileReader as they are reader
>specific.
>
>They are there just for convenience.  Users can call withOutputFields() on
>the spout and set it to these predefined names or anything else.
>
>
>-Roshan
>



Re: [Discuss] Storm hdfs spout improvements

2017-02-13 Thread Sachin Pasalkar
I have attached updated source code of HDFSSpout for more reference. I
have updated respective classes (not attached)

On 13/02/17, 10:02 PM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
wrote:

>Hi,
>
>I was looking at storm hdfs spout code in 1.x branch, I found below
>improvements can be made in below code.
>
>  1.  Make org.apache.storm.hdfs.spout.AbstractFileReader as public so
>that it can be used in generics.
>  2.  org.apache.storm.hdfs.spout.HdfsSpout requires readerType as
>String. It will be great to have class
>readerType; So we will not use Class.forName at multiple places also it
>will help in below point.
>  3.  HdfsSpout also needs to provide outFields which are declared as
>constants in each reader(e.g.SequenceFileReader). We can have abstract
>API AbstractFileReader in which return them to user to make it generic.
>
>Let me know your thoughts on this.
>
>Thanks,
>Sachin



[Discuss] Storm hdfs spout improvements

2017-02-13 Thread Sachin Pasalkar
Hi,

I was looking at storm hdfs spout code in 1.x branch, I found below 
improvements can be made in below code.

  1.  Make org.apache.storm.hdfs.spout.AbstractFileReader as public so that it 
can be used in generics.
  2.  org.apache.storm.hdfs.spout.HdfsSpout requires readerType as String. It 
will be great to have class readerType; So we 
will not use Class.forName at multiple places also it will help in below point.
  3.  HdfsSpout also needs to provide outFields which are declared as constants 
in each reader(e.g.SequenceFileReader). We can have abstract API 
AbstractFileReader in which return them to user to make it generic.

Let me know your thoughts on this.

Thanks,
Sachin


Re: Stop user from killing topology before X (configured) amount of time

2017-01-18 Thread Sachin Pasalkar
Sorry for spamming.

Can someone help in pointing out branch, I want to fix this if possible?

Thanks,
Sachin

On 18/01/17, 8:06 AM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
wrote:

>Can you point out the branch which I need to checkout? I don¹t see below
>Nimbus class in 1.x branch
>
>On 17/01/17, 10:35 PM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
>wrote:
>
>>Hi Bobby,
>>
>>Thanks for the response. I have created JIRA
>>https://issues.apache.org/jira/browse/STORM-2299. I will try to take look
>>at it. I may ask some information if needed.
>>
>>Regards,
>>Sachin
>>
>>From: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
>>Date: Tuesday, 17 January 2017 at 9:16 PM
>>To: Sachin Pasalkar
>><sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>,
>>"dev@storm.apache.org<mailto:dev@storm.apache.org>"
>><dev@storm.apache.org<mailto:dev@storm.apache.org>>
>>Subject: Re: Stop user from killing topology before X (configured) amount
>>of time
>>
>>I would like to add that it would be good to have an admin override on
>>this.  If someone accidentally makes the wait time 100 mins instead of
>>100 seconds, it would be good to have an admin be able to really truly
>>kill it faster.
>>
>>
>>- Bobby
>>
>>
>>On Tuesday, January 17, 2017, 9:44:08 AM CST, Bobby Evans
>><ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>> wrote:
>>In order to kill a topology with no wait period the operator needs to
>>supply extra arguments `-w 0`  or the code needs to be a few lines longer
>>to pass in the KillOptions with a 0 timeout.  If you want a configured
>>minimum timeout for a given topology I think that would be perfectly
>>fine.  We do not currently support that, but please file a JIRA and
>>hopefully someone can take a look at supporting it.  You can probably do
>>a lot of the work yourself if you want to.
>>
>>The function you care about is here
>>
>>https://clicktime.symantec.com/a/1/2fQ9x0Ghmi7zyKyqEgy_eMCiyDksen0wXowwyW
>>8
>>raeU=?d=oSVAgk8qtKLjp8SsSKKdQGeGgwrYSL4YLBTdav_tfzjRGHdVjUPRdIC8uP3f_19HL
>>b
>>QP_DmLNtAZG97XkzvNRzrF0SE3L3kmm1F6S6RMFrhb6YXb8IB1VNtBHLb5glLccKrdvKNxAEq
>>P
>>HC7RTFNlxcw42TSI5In7DvC-ksZPivf17z1pQ61L8oEHZbbHJQ3nnzD96ILXL7qpLye-Yrp7L
>>0
>>lmoCqBAnAiaeiM3MXH_YN-ONcWqUxHEzsQE3TZI2W92lJOeYqCwKpp-2kSouqAvgnwrSquKgd
>>X
>>RmAbba8s4n-S4sNvE5KFLnZ9Lshhw70II-r9N4iEMNOfvFs6l90YrxoDwn8ZLL8_msDty9PSD
>>v
>>2-PiKdLrPYywp0XC4S8NJlYa4O6ZA6XiYEzkvNOI4MnQji69z8g8dtKNmymg4OFIf-gUmYqQz
>>l
>>2Ci1rFXghTNo7yENJyeoJxzcqz3azveiTlKB0-KPxw%3D%3D=https%3A%2F%2Fgithub.c
>>o
>>m%2Fapache%2Fstorm%2Fblob%2F51c8474143b0081ff0522b0367f3efdba2689089%2Fst
>>o
>>rm-core%2Fsrc%2Fjvm%2Forg%2Fapache%2Fstorm%2Fdaemon%2Fnimbus%2FNimbus.jav
>>a
>>%23L2573-L2595<https://clicktime.symantec.com/a/1/hMFfR0x0xhEPmnLkHrzWh1u
>>M
>>1FSHoBzqUa0fgE8KPhI=?d=MS-UWN_vi-ZagJt0xU9IbvMa_Sn5eMGVNdZjVChgjfQhVEoPck
>>k
>>5VmUO2oDaDYkTCwElmne6IQyPHIs9Xsx084v1kwUi12v19jPqFi2LdGRZlDEGKeq1Gmvap2me
>>3
>>KrdZ8XQlgz9QYP6tI9JZQWmvDxUG05nMBb-jaYIpO18xE0MHYoRK2-h_USW97P7EB7pfMIvXZ
>>l
>>6w-WWJdIDa9H2Eyc1tV1KXa86gDMqWmas7nf3C5nXp1-PHc6f6iQ3IwxC5aybtUIeNAppWBP8
>>O
>>YeXZ2wiQkzAplSlfDw4ITeSXx0MDEla47QjArk-uAlgsRCv7i-i746Yt2NXxUgpMd2HHhTQqr
>>J
>>ZGTV9QWQlgSG0K47u2TK1xmxZNzdzjHK_wrJ07ilUKeIVsThyyA_Jm7wg4Qwwf1dwnkVnt6zY
>>E
>>h3Ze6kE=https%3A%2F%2Fgithub.com%2Fapache%2Fstorm%2Fblob%2F51c8474143b0
>>0
>>81ff0522b0367f3efdba2689089%2Fstorm-core%2Fsrc%2Fjvm%2Forg%2Fapache%2Fsto
>>r
>>m%2Fdaemon%2Fnimbus%2FNimbus.java%23L2573-L2595>
>>
>>and it really would be mostly inserting a check
>>
>>probably after this line
>>
>>https://clicktime.symantec.com/a/1/KLseHXZon-yj2uKWnuvOIPZLuJ9329NAZDoWY5
>>_
>>gPgQ=?d=oSVAgk8qtKLjp8SsSKKdQGeGgwrYSL4YLBTdav_tfzjRGHdVjUPRdIC8uP3f_19HL
>>b
>>QP_DmLNtAZG97XkzvNRzrF0SE3L3kmm1F6S6RMFrhb6YXb8IB1VNtBHLb5glLccKrdvKNxAEq
>>P
>>HC7RTFNlxcw42TSI5In7DvC-ksZPivf17z1pQ61L8oEHZbbHJQ3nnzD96ILXL7qpLye-Yrp7L
>>0
>>lmoCqBAnAiaeiM3MXH_YN-ONcWqUxHEzsQE3TZI2W92lJOeYqCwKpp-2kSouqAvgnwrSquKgd
>>X
>>RmAbba8s4n-S4sNvE5KFLnZ9Lshhw70II-r9N4iEMNOfvFs6l90YrxoDwn8ZLL8_msDty9PSD
>>v
>>2-PiKdLrPYywp0XC4S8NJlYa4O6ZA6XiYEzkvNOI4MnQji69z8g8dtKNmymg4OFIf-gUmYqQz
>>l
>>2Ci1rFXghTNo7yENJyeoJxzcqz3azveiTlKB0-KPxw%3D%3D=https%3A%2F%2Fgithub.c
>>o
>>m%2Fapache%2Fstorm%2Fblob%2F51c8474143b0081ff0522b0367f3efdba2689089%2Fst
&

Re: [DISCUSS] New Kafka spout doesn't support seek to given offset

2017-01-18 Thread Sachin Pasalkar
[Updated Subject]

Incase of 2nd case, as user has changed group.id there will be no history for 
this id. So code will automatically fall down to the EARLIEST or LATEST.

I was able to code for it somewhat where I am able to fetch data from certain 
offset (with kind of hack). What I have seen is when I provide offset it pull 
up the proper records. However, 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.subscribeKafkaConsumer()
 call poll initially which caused off set to get updated to latest. I guess we 
need to have work on that on Kafka site to update partition offset incase user 
has provided the offset. If that works then we have minimal code to pass on the 
offset.

Below are cases I have tried

  *   Don't provide offset: It will behave normal.
  *   Have less offset requested: It works with caveat it reads 2 time once 
with offset came from subscribeKafkaConsumer call but drops the data for 
request as requested & actual position differs. Then it takes my provided value 
& works fine further )
  *   Have bigger number than current offset: This is case where I got blocked 
because we are not setting expected offset initially in call of 
subscribeKafkaConsumer.  As my code keeps updated the offset to user provided. 
I can put hack but not sure how it will behave in all cases.

Below is code I inserted in
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(TopicPartition,
 KafkaTridentSpoutBatchMetadata<K, V>) where startOffset is offset value 
provided by user.

if(startOffset!=null && lastBatchMeta==null){

kafkaConsumer.seek(tp, startOffset + 1);  // seek offset provided by user

LOG.debug("Seeking fetch offset provided by user");

}

else if (lastBatchMeta != null) {

kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset 
after last offset from previous batch

LOG.debug("Seeking fetch offset to next offset after last offset from previous 
batch");


}...

On 18/01/17, 9:30 PM, "Hugo Da Cruz Louro" 
<hlo...@hortonworks.com<mailto:hlo...@hortonworks.com>> wrote:

Hi Sachin,

The 2nd case can likely handled with the committed offset, which is covered by 
UNCOMMITTED_EARLIEST or UNCOMMITTED_LATEST.

The 1st case it may make sense but even if you give the start offset, since 
Kafka polls a certain number of bytes, and not specifically a number of 
records, it may not be trivial to guarantee that the same exact dataset is 
polled each time.

However, If we as a community agree that it is useful to support your proposed 
feature, I have no particular argument to do so.

Best,
Hugo
PS. We usually have the practice to initiate discussion threads with email 
subject prefixed with [DISCUSS]



On Jan 18, 2017, at 6:33 AM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:
Hi,
I was looking at code of current KafkaTridentSpoutEmitter & KafkaSpout class. 
Can we add functionality based on user provided offset to start from particular 
offset? This would be useful incase user wants to reprocess particular data 
set. Another example user has changed the group id & aware where old offset 
committed & he wants to start processing from same position.
Does this make sense? OR its explicit that it will not be supported?
Thanks,
Sachin




Re: Stop user from killing topology before X (configured) amount of time

2017-01-17 Thread Sachin Pasalkar
Can you point out the branch which I need to checkout? I don¹t see below
Nimbus class in 1.x branch

On 17/01/17, 10:35 PM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
wrote:

>Hi Bobby,
>
>Thanks for the response. I have created JIRA
>https://issues.apache.org/jira/browse/STORM-2299. I will try to take look
>at it. I may ask some information if needed.
>
>Regards,
>Sachin
>
>From: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
>Date: Tuesday, 17 January 2017 at 9:16 PM
>To: Sachin Pasalkar
><sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>,
>"dev@storm.apache.org<mailto:dev@storm.apache.org>"
><dev@storm.apache.org<mailto:dev@storm.apache.org>>
>Subject: Re: Stop user from killing topology before X (configured) amount
>of time
>
>I would like to add that it would be good to have an admin override on
>this.  If someone accidentally makes the wait time 100 mins instead of
>100 seconds, it would be good to have an admin be able to really truly
>kill it faster.
>
>
>- Bobby
>
>
>On Tuesday, January 17, 2017, 9:44:08 AM CST, Bobby Evans
><ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>> wrote:
>In order to kill a topology with no wait period the operator needs to
>supply extra arguments `-w 0`  or the code needs to be a few lines longer
>to pass in the KillOptions with a 0 timeout.  If you want a configured
>minimum timeout for a given topology I think that would be perfectly
>fine.  We do not currently support that, but please file a JIRA and
>hopefully someone can take a look at supporting it.  You can probably do
>a lot of the work yourself if you want to.
>
>The function you care about is here
>
>https://clicktime.symantec.com/a/1/2fQ9x0Ghmi7zyKyqEgy_eMCiyDksen0wXowwyW8
>raeU=?d=oSVAgk8qtKLjp8SsSKKdQGeGgwrYSL4YLBTdav_tfzjRGHdVjUPRdIC8uP3f_19HLb
>QP_DmLNtAZG97XkzvNRzrF0SE3L3kmm1F6S6RMFrhb6YXb8IB1VNtBHLb5glLccKrdvKNxAEqP
>HC7RTFNlxcw42TSI5In7DvC-ksZPivf17z1pQ61L8oEHZbbHJQ3nnzD96ILXL7qpLye-Yrp7L0
>lmoCqBAnAiaeiM3MXH_YN-ONcWqUxHEzsQE3TZI2W92lJOeYqCwKpp-2kSouqAvgnwrSquKgdX
>RmAbba8s4n-S4sNvE5KFLnZ9Lshhw70II-r9N4iEMNOfvFs6l90YrxoDwn8ZLL8_msDty9PSDv
>2-PiKdLrPYywp0XC4S8NJlYa4O6ZA6XiYEzkvNOI4MnQji69z8g8dtKNmymg4OFIf-gUmYqQzl
>2Ci1rFXghTNo7yENJyeoJxzcqz3azveiTlKB0-KPxw%3D%3D=https%3A%2F%2Fgithub.co
>m%2Fapache%2Fstorm%2Fblob%2F51c8474143b0081ff0522b0367f3efdba2689089%2Fsto
>rm-core%2Fsrc%2Fjvm%2Forg%2Fapache%2Fstorm%2Fdaemon%2Fnimbus%2FNimbus.java
>%23L2573-L2595<https://clicktime.symantec.com/a/1/hMFfR0x0xhEPmnLkHrzWh1uM
>1FSHoBzqUa0fgE8KPhI=?d=MS-UWN_vi-ZagJt0xU9IbvMa_Sn5eMGVNdZjVChgjfQhVEoPckk
>5VmUO2oDaDYkTCwElmne6IQyPHIs9Xsx084v1kwUi12v19jPqFi2LdGRZlDEGKeq1Gmvap2me3
>KrdZ8XQlgz9QYP6tI9JZQWmvDxUG05nMBb-jaYIpO18xE0MHYoRK2-h_USW97P7EB7pfMIvXZl
>6w-WWJdIDa9H2Eyc1tV1KXa86gDMqWmas7nf3C5nXp1-PHc6f6iQ3IwxC5aybtUIeNAppWBP8O
>YeXZ2wiQkzAplSlfDw4ITeSXx0MDEla47QjArk-uAlgsRCv7i-i746Yt2NXxUgpMd2HHhTQqrJ
>ZGTV9QWQlgSG0K47u2TK1xmxZNzdzjHK_wrJ07ilUKeIVsThyyA_Jm7wg4Qwwf1dwnkVnt6zYE
>h3Ze6kE=https%3A%2F%2Fgithub.com%2Fapache%2Fstorm%2Fblob%2F51c8474143b00
>81ff0522b0367f3efdba2689089%2Fstorm-core%2Fsrc%2Fjvm%2Forg%2Fapache%2Fstor
>m%2Fdaemon%2Fnimbus%2FNimbus.java%23L2573-L2595>
>
>and it really would be mostly inserting a check
>
>probably after this line
>
>https://clicktime.symantec.com/a/1/KLseHXZon-yj2uKWnuvOIPZLuJ9329NAZDoWY5_
>gPgQ=?d=oSVAgk8qtKLjp8SsSKKdQGeGgwrYSL4YLBTdav_tfzjRGHdVjUPRdIC8uP3f_19HLb
>QP_DmLNtAZG97XkzvNRzrF0SE3L3kmm1F6S6RMFrhb6YXb8IB1VNtBHLb5glLccKrdvKNxAEqP
>HC7RTFNlxcw42TSI5In7DvC-ksZPivf17z1pQ61L8oEHZbbHJQ3nnzD96ILXL7qpLye-Yrp7L0
>lmoCqBAnAiaeiM3MXH_YN-ONcWqUxHEzsQE3TZI2W92lJOeYqCwKpp-2kSouqAvgnwrSquKgdX
>RmAbba8s4n-S4sNvE5KFLnZ9Lshhw70II-r9N4iEMNOfvFs6l90YrxoDwn8ZLL8_msDty9PSDv
>2-PiKdLrPYywp0XC4S8NJlYa4O6ZA6XiYEzkvNOI4MnQji69z8g8dtKNmymg4OFIf-gUmYqQzl
>2Ci1rFXghTNo7yENJyeoJxzcqz3azveiTlKB0-KPxw%3D%3D=https%3A%2F%2Fgithub.co
>m%2Fapache%2Fstorm%2Fblob%2F51c8474143b0081ff0522b0367f3efdba2689089%2Fsto
>rm-core%2Fsrc%2Fjvm%2Forg%2Fapache%2Fstorm%2Fdaemon%2Fnimbus%2FNimbus.java
>%23L2583<https://clicktime.symantec.com/a/1/vgKlrHDYwUXdyawdaYGs46xwhu5329
>aNEdr3WD7HDrE=?d=MS-UWN_vi-ZagJt0xU9IbvMa_Sn5eMGVNdZjVChgjfQhVEoPckk5VmUO2
>oDaDYkTCwElmne6IQyPHIs9Xsx084v1kwUi12v19jPqFi2LdGRZlDEGKeq1Gmvap2me3KrdZ8X
>Qlgz9QYP6tI9JZQWmvDxUG05nMBb-jaYIpO18xE0MHYoRK2-h_USW97P7EB7pfMIvXZl6w-WWJ
>dIDa9H2Eyc1tV1KXa86gDMqWmas7nf3C5nXp1-PHc6f6iQ3IwxC5aybtUIeNAppWBP8OYeXZ2w
>iQkzAplSlfDw4ITeSXx0MDEla47QjArk-uAlgsRCv7i-i746Yt2NXxUgpMd2HHhTQqrJZGTV9Q
>WQlgSG0K47u2TK1xmxZNzdzjHK_wrJ07ilUKeIVsThyyA_Jm7wg4Qwwf1dwnkVnt6zYEh3Ze6k
>E=https%3A%2F%2Fgithub.com%2Fapache%2Fstorm%2Fblob%2F51c8474143b0081ff05
>22b0367f3efdba2689089%2Fstorm-core%2Fsrc%2Fjvm%2Forg%2Fap

Re: Stop user from killing topology before X (configured) amount of time

2017-01-17 Thread Sachin Pasalkar
Hi Bobby,

Thanks for the response. I have created JIRA 
https://issues.apache.org/jira/browse/STORM-2299. I will try to take look at 
it. I may ask some information if needed.

Regards,
Sachin

From: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Date: Tuesday, 17 January 2017 at 9:16 PM
To: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>, 
"dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Subject: Re: Stop user from killing topology before X (configured) amount of 
time

I would like to add that it would be good to have an admin override on this.  
If someone accidentally makes the wait time 100 mins instead of 100 seconds, it 
would be good to have an admin be able to really truly kill it faster.


- Bobby


On Tuesday, January 17, 2017, 9:44:08 AM CST, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>> wrote:
In order to kill a topology with no wait period the operator needs to supply 
extra arguments `-w 0`  or the code needs to be a few lines longer to pass in 
the KillOptions with a 0 timeout.  If you want a configured minimum timeout for 
a given topology I think that would be perfectly fine.  We do not currently 
support that, but please file a JIRA and hopefully someone can take a look at 
supporting it.  You can probably do a lot of the work yourself if you want to.

The function you care about is here

https://github.com/apache/storm/blob/51c8474143b0081ff0522b0367f3efdba2689089/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java#L2573-L2595<https://clicktime.symantec.com/a/1/hMFfR0x0xhEPmnLkHrzWh1uM1FSHoBzqUa0fgE8KPhI=?d=MS-UWN_vi-ZagJt0xU9IbvMa_Sn5eMGVNdZjVChgjfQhVEoPckk5VmUO2oDaDYkTCwElmne6IQyPHIs9Xsx084v1kwUi12v19jPqFi2LdGRZlDEGKeq1Gmvap2me3KrdZ8XQlgz9QYP6tI9JZQWmvDxUG05nMBb-jaYIpO18xE0MHYoRK2-h_USW97P7EB7pfMIvXZl6w-WWJdIDa9H2Eyc1tV1KXa86gDMqWmas7nf3C5nXp1-PHc6f6iQ3IwxC5aybtUIeNAppWBP8OYeXZ2wiQkzAplSlfDw4ITeSXx0MDEla47QjArk-uAlgsRCv7i-i746Yt2NXxUgpMd2HHhTQqrJZGTV9QWQlgSG0K47u2TK1xmxZNzdzjHK_wrJ07ilUKeIVsThyyA_Jm7wg4Qwwf1dwnkVnt6zYEh3Ze6kE=https%3A%2F%2Fgithub.com%2Fapache%2Fstorm%2Fblob%2F51c8474143b0081ff0522b0367f3efdba2689089%2Fstorm-core%2Fsrc%2Fjvm%2Forg%2Fapache%2Fstorm%2Fdaemon%2Fnimbus%2FNimbus.java%23L2573-L2595>

and it really would be mostly inserting a check

probably after this line

https://github.com/apache/storm/blob/51c8474143b0081ff0522b0367f3efdba2689089/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java#L2583<https://clicktime.symantec.com/a/1/vgKlrHDYwUXdyawdaYGs46xwhu5329aNEdr3WD7HDrE=?d=MS-UWN_vi-ZagJt0xU9IbvMa_Sn5eMGVNdZjVChgjfQhVEoPckk5VmUO2oDaDYkTCwElmne6IQyPHIs9Xsx084v1kwUi12v19jPqFi2LdGRZlDEGKeq1Gmvap2me3KrdZ8XQlgz9QYP6tI9JZQWmvDxUG05nMBb-jaYIpO18xE0MHYoRK2-h_USW97P7EB7pfMIvXZl6w-WWJdIDa9H2Eyc1tV1KXa86gDMqWmas7nf3C5nXp1-PHc6f6iQ3IwxC5aybtUIeNAppWBP8OYeXZ2wiQkzAplSlfDw4ITeSXx0MDEla47QjArk-uAlgsRCv7i-i746Yt2NXxUgpMd2HHhTQqrJZGTV9QWQlgSG0K47u2TK1xmxZNzdzjHK_wrJ07ilUKeIVsThyyA_Jm7wg4Qwwf1dwnkVnt6zYEh3Ze6kE=https%3A%2F%2Fgithub.com%2Fapache%2Fstorm%2Fblob%2F51c8474143b0081ff0522b0367f3efdba2689089%2Fstorm-core%2Fsrc%2Fjvm%2Forg%2Fapache%2Fstorm%2Fdaemon%2Fnimbus%2FNimbus.java%23L2583>

to be sure the waitAmount is >= the configured minimum.


- Bobby


On Tuesday, January 17, 2017, 1:28:32 AM CST, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:
Currently user can kill topology directly without waiting for some amount of 
time so that all inflight messages will get processed.  For example, storm is 
writing to file & user kills topology, file is not closed or moved to proper 
location. We need to educate operation guys to do the right things also there 
are some chances that it will be not followed causing system to go in 
inconsistent state.

Can we set mandatory timeout (configurable) when user kills storm topology? 
User should not be allowed kill topology with time less than mentioned time.

Some case:
1) If topology is long running don't allow user to kill but time not less than 
mentioned one
2) If topology is just deployed allow him to kill instantly (as it might be 
some mistake)
3) Handle same cases from command-line.

Thanks,
Sachin


Stop user from killing topology before X (configured) amount of time

2017-01-16 Thread Sachin Pasalkar
Currently user can kill topology directly without waiting for some amount of 
time so that all inflight messages will get processed.  For example, storm is 
writing to file & user kills topology, file is not closed or moved to proper 
location. We need to educate operation guys to do the right things also there 
are some chances that it will be not followed causing system to go in 
inconsistent state.

Can we set mandatory timeout (configurable) when user kills storm topology? 
User should not be allowed kill topology with time less than mentioned time.

Some case:
1) If topology is long running don't allow user to kill but time not less than 
mentioned one
2) If topology is just deployed allow him to kill instantly (as it might be 
some mistake)
3) Handle same cases from command-line.

Thanks,
Sachin



Pull request

2017-01-16 Thread Sachin Pasalkar
Hi,

I have raised pull request. Can anyone take look at it? 
https://github.com/apache/storm/pull/1878

Thanks,
Sachin


Re: Different way used to create file path in hdfs state

2016-10-17 Thread Sachin Pasalkar
Can anyone let me know if its done purposefully?


On 13/10/16, 11:08 PM, "Sachin Pasalkar" <sachin_pasal...@symantec.com>
wrote:

>Hi,
>
>I was looking HdfsState class where createOutputFile api present at line
>182 is as follows
>
>Path path = new Path(this.fileNameFormat.getPath(),
>this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
>
>whereas there is another createOutputFile at line 272 line
>
>Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(),
>this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
>
>In line number 182 user has to provide the hdfs uri in path itself, its
>handled properly in 272
>
>Is there any reason for above difference or its just missed?
>
>Thanks,
>Sachin



Different way used to create file path in hdfs state

2016-10-13 Thread Sachin Pasalkar
Hi,

I was looking HdfsState class where createOutputFile api present at line 182 is 
as follows

Path path = new Path(this.fileNameFormat.getPath(), 
this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));

whereas there is another createOutputFile at line 272 line

Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), 
this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));

In line number 182 user has to provide the hdfs uri in path itself, its handled 
properly in 272

Is there any reason for above difference or its just missed?

Thanks,
Sachin


Re: Storm's Kafka spout is not reading latest data even with new consumer group

2016-05-30 Thread Sachin Pasalkar
Does this make sense? I believe we shouldn’t  ask dev/devops to add/remember 
configuration. It should work by default by what it means e.g. IgnorezkOffset 
is true then should read from start of segment, if consumer is new it should 
read from latest. User shouldn’t has to mention any config to get it work.

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Monday, 30 May 2016 12:00 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Abhishek Agarwal 
<abhishc...@gmail.com<mailto:abhishc...@gmail.com>>
Cc: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>, Narendra 
Bidari <narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

What I am suggesting was there in 0.9.x version which got removed with changing 
variable name from forceFromStart to ignoreZkOffsets

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Date: Monday, 30 May 2016 11:44 am
To: Abhishek Agarwal 
<abhishc...@gmail.com<mailto:abhishc...@gmail.com><mailto:abhishc...@gmail.com>>
Cc: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>,
 Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com>>, 
Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

Mostly in PROD ignoreZkOffsets is false. However in testing user may want to 
read data from start of segment (May be to debug OR any other purpose). What I 
am talking about is less config management perspective. If user sets 
ignoreZkOffsets to true by default it will read from start of segment otherwise 
user can have specified config to read from config.startOffsetTime.

From: Abhishek Agarwal 
<abhishc...@gmail.com<mailto:abhishc...@gmail.com><mailto:abhishc...@gmail.com><mailto:abhishc...@gmail.com>>
Date: Monday, 30 May 2016 10:53 am
To: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
Cc: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>,
 Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com>>,
 Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

how about setting ignoreZkOffsets to false? config.startOffsetTime indicates 
which offset to start from if there is no prior information about the offsets. 
if ignoreZkOffsets is true, it is intended to read from what is specified in 
configuration.

On Mon, May 30, 2016 at 9:35 AM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
 wrote:
Not really. If you look at storm.kafka.trident.TridentKafkaEmitter code 
(doEmitNewPartitionBatch API). Setting time to latest will loose impact of 
ignoreZkOffsets. If ignoreZkOffsets is set to true it will still read from 
latest which user will not expect to happen.


/* 103 */   if ((_config.ignoreZkOffsets) && 
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config.startOffsetTime);

/* */   } else {

/* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue();

/* */   }

/* */ } else {

/* 109 */   offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config);

/* */ }

From: Abhishek Agarwa

Re: Storm's Kafka spout is not reading latest data even with new consumer group

2016-05-30 Thread Sachin Pasalkar
What I am suggesting was there in 0.9.x version which got removed with changing 
variable name from forceFromStart to ignoreZkOffsets

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Monday, 30 May 2016 11:44 am
To: Abhishek Agarwal <abhishc...@gmail.com<mailto:abhishc...@gmail.com>>
Cc: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>, Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

Mostly in PROD ignoreZkOffsets is false. However in testing user may want to 
read data from start of segment (May be to debug OR any other purpose). What I 
am talking about is less config management perspective. If user sets 
ignoreZkOffsets to true by default it will read from start of segment otherwise 
user can have specified config to read from config.startOffsetTime.

From: Abhishek Agarwal 
<abhishc...@gmail.com<mailto:abhishc...@gmail.com><mailto:abhishc...@gmail.com>>
Date: Monday, 30 May 2016 10:53 am
To: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
Cc: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>,
 Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com>>, 
Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

how about setting ignoreZkOffsets to false? config.startOffsetTime indicates 
which offset to start from if there is no prior information about the offsets. 
if ignoreZkOffsets is true, it is intended to read from what is specified in 
configuration.

On Mon, May 30, 2016 at 9:35 AM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
 wrote:
Not really. If you look at storm.kafka.trident.TridentKafkaEmitter code 
(doEmitNewPartitionBatch API). Setting time to latest will loose impact of 
ignoreZkOffsets. If ignoreZkOffsets is set to true it will still read from 
latest which user will not expect to happen.


/* 103 */   if ((_config.ignoreZkOffsets) && 
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config.startOffsetTime);

/* */   } else {

/* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue();

/* */   }

/* */ } else {

/* 109 */   offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config);

/* */ }

From: Abhishek Agarwal 
<abhishc...@gmail.com<mailto:abhishc...@gmail.com><mailto:abhishc...@gmail.com>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Date: Sunday, 29 May 2016 8:32 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Cc: Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com>>, 
Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>>

Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not
work?

On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar <
sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
 wrote:

I looked at discussion thread. It looks like user did this changes so new
consumer will start reading data from earliest offset rather than latest.
They haven’t consider below case as well if the there is changes in data &
user forgot to clear old data from kafka topic it will cause mess (If user
start with new consumer user will expect to read it from latest OR he can
set offset explicitly) Setting to earliest is more error prone in PROD.

Thoughts?

From: Sach

Re: Storm's Kafka spout is not reading latest data even with new consumer group

2016-05-30 Thread Sachin Pasalkar
Mostly in PROD ignoreZkOffsets is false. However in testing user may want to 
read data from start of segment (May be to debug OR any other purpose). What I 
am talking about is less config management perspective. If user sets 
ignoreZkOffsets to true by default it will read from start of segment otherwise 
user can have specified config to read from config.startOffsetTime.

From: Abhishek Agarwal <abhishc...@gmail.com<mailto:abhishc...@gmail.com>>
Date: Monday, 30 May 2016 10:53 am
To: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Cc: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>, Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

how about setting ignoreZkOffsets to false? config.startOffsetTime indicates 
which offset to start from if there is no prior information about the offsets. 
if ignoreZkOffsets is true, it is intended to read from what is specified in 
configuration.

On Mon, May 30, 2016 at 9:35 AM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:
Not really. If you look at storm.kafka.trident.TridentKafkaEmitter code 
(doEmitNewPartitionBatch API). Setting time to latest will loose impact of 
ignoreZkOffsets. If ignoreZkOffsets is set to true it will still read from 
latest which user will not expect to happen.


/* 103 */   if ((_config.ignoreZkOffsets) && 
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config.startOffsetTime);

/* */   } else {

/* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue();

/* */   }

/* */ } else {

/* 109 */   offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config);

/* */ }

From: Abhishek Agarwal <abhishc...@gmail.com<mailto:abhishc...@gmail.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Sunday, 29 May 2016 8:32 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Cc: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>, Narendra 
Bidari <narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>

Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not
work?

On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar <
sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:

I looked at discussion thread. It looks like user did this changes so new
consumer will start reading data from earliest offset rather than latest.
They haven’t consider below case as well if the there is changes in data &
user forgot to clear old data from kafka topic it will cause mess (If user
start with new consumer user will expect to read it from latest OR he can
set offset explicitly) Setting to earliest is more error prone in PROD.

Thoughts?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>mailto:sachin_pasal...@symantec.com>>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org%3E>"
 <
dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org%3E>>
Date: Saturday, 28 May 2016 5:12 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org%3E>"
 <
dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org%3E>>,
 Bobby Evans <
ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com%3E>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>mailto:narendra_bid...@symantec.com>>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Thanks Bobby. I will ask on ticket.

From: Bobby Evans 
<ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>mailto:ev...@yahoo-inc.com.INVALID>><mailto:ev...@yahoo-inc.com.INVALID><mailto:ev...@yahoo-inc.com.INVALID%3E>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apa

Re: Storm's Kafka spout is not reading latest data even with new consumer group

2016-05-29 Thread Sachin Pasalkar
Not really. If you look at storm.kafka.trident.TridentKafkaEmitter code 
(doEmitNewPartitionBatch API). Setting time to latest will loose impact of 
ignoreZkOffsets. If ignoreZkOffsets is set to true it will still read from 
latest which user will not expect to happen.


/* 103 */   if ((_config.ignoreZkOffsets) && 
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config.startOffsetTime);

/* */   } else {

/* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue();

/* */   }

/* */ } else {

/* 109 */   offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config);

/* */ }

From: Abhishek Agarwal <abhishc...@gmail.com<mailto:abhishc...@gmail.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Sunday, 29 May 2016 8:32 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Cc: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>, Narendra 
Bidari <narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not
work?

On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar <
sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:

I looked at discussion thread. It looks like user did this changes so new
consumer will start reading data from earliest offset rather than latest.
They haven’t consider below case as well if the there is changes in data &
user forgot to clear old data from kafka topic it will cause mess (If user
start with new consumer user will expect to read it from latest OR he can
set offset explicitly) Setting to earliest is more error prone in PROD.

Thoughts?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>mailto:sachin_pasal...@symantec.com>>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 <
dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Date: Saturday, 28 May 2016 5:12 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 <
dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>,
 Bobby Evans <
ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>mailto:narendra_bid...@symantec.com>>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Thanks Bobby. I will ask on ticket.

From: Bobby Evans 
<ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>mailto:ev...@yahoo-inc.com.INVALID>><mailto:ev...@yahoo-inc.com.INVALID>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>mailto:dev@storm.apache.org>>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org
><mailto:dev@storm.apache.org>>, Bobby Evans 
><ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>mailto:ev...@yahoo-inc.com>><mailto:ev...@yahoo-inc.com>>
Date: Friday, 27 May 2016 7:45 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>mailto:dev@storm.apache.org>>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org
><mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>mailto:narendra_bid...@symantec.com>><mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Looks like it changed as a part of
https://issues.apache.org/jira/browse/STORM-563.  That might be a good
place to ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of
forceFromStart, but I have not dug into the exact details of the change to
know what all the ramifications might have been.
- Bobby

 On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <
sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>mailto:sachin_pasal...@symantec.com>>> wrote:

Can you look at this please?

From: Sachin Pasalkar 
<sachin_

Re: Storm's Kafka spout is not reading latest data even with new consumer group

2016-05-29 Thread Sachin Pasalkar
I looked at discussion thread. It looks like user did this changes so new 
consumer will start reading data from earliest offset rather than latest. They 
haven’t consider below case as well if the there is changes in data & user 
forgot to clear old data from kafka topic it will cause mess (If user start 
with new consumer user will expect to read it from latest OR he can set offset 
explicitly) Setting to earliest is more error prone in PROD.

Thoughts?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Saturday, 28 May 2016 5:12 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

Thanks Bobby. I will ask on ticket.

From: Bobby Evans 
<ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID><mailto:ev...@yahoo-inc.com.INVALID>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>,
 Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com>>
Date: Friday, 27 May 2016 7:45 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

Looks like it changed as a part of 
https://issues.apache.org/jira/browse/STORM-563.  That might be a good place to 
ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of 
forceFromStart, but I have not dug into the exact details of the change to know 
what all the ramifications might have been.
- Bobby

On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
 wrote:

Can you look at this please?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Date: Thursday, 26 May 2016 9:35 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

Can anyone look at this?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Date: Thursday, 26 May 2016 1:18 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@syma

[jira] [Commented] (STORM-563) Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified

2016-05-28 Thread Sachin Pasalkar (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305326#comment-15305326
 ] 

Sachin Pasalkar commented on STORM-563:
---

[~sriharsha] Due to this changes some off our changes are breaking. We had a 
following case
1) Our topologies were running for long time suddenly we started getting issue 
of not enough data to calculates spout lag. 
2) As remediation of this we had to changes consumer group. Changing consumer 
group caused processing duplicates in Trident.

In KafkaConfig
{code:java}
public long startOffsetTime = OffsetRequest.EarliestTime();
{code}
In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if 
consumer group is new calls goes to 109 line
{code:java}
  if (lastMeta != null) {
/*  98 */   String lastInstanceId = null;
/*  99 */   Map lastTopoMeta = (Map)lastMeta.get("topology");
/* 100 */   if (lastTopoMeta != null)
/* 101 */ lastInstanceId = (String)lastTopoMeta.get("id");
/* */   long offset;
/* 103 */   if ((_config.ignoreZkOffsets) && 
(!_topologyInstanceId.equals(lastInstanceId))) {
/* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config.startOffsetTime);
/* */   } else {
/* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue();
/* */   }
/* */ } else {
/* 109 */  offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config);
/* */ }
{code}
Which calls below API. As you can see this call will fetch earliest data rather 
than fetching latest
{code:java}
public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config)
{
  long startOffsetTime = config.startOffsetTime;
return getOffset(consumer, topic, partition, startOffsetTime);
}
{code}
How it should be (It was there in previous release 0.9.x)
{code:java}
public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config) {
long startOffsetTime = kafka.api.OffsetRequest.LatestTime();

if ( config.ignoreZkOffsets) {
startOffsetTime = config.startOffsetTime;
}
return getOffset(consumer, topic, partition, startOffsetTime);
}
{code}

Why do you think Spout should pick up from beginning? It should pick up only 
when specified. This changes will also allow user if he wants to ignore 
zkoffset & read data from particular time. 

 

> Kafka Spout doesn't pick up from the beginning of the queue unless 
> forceFromStart specified
> ---
>
> Key: STORM-563
> URL: https://issues.apache.org/jira/browse/STORM-563
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Reporter: Sriharsha Chintalapani
>Assignee: Sriharsha Chintalapani
> Fix For: 0.10.0
>
>
> KafkaUtil.getOffset starts from LatestTime unless forceFromStart specified. 
> It should pick this from KafkaConfig.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Storm's Kafka spout is not reading latest data even with new consumer group

2016-05-28 Thread Sachin Pasalkar
Thanks Bobby. I will ask on ticket.

From: Bobby Evans 
<ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Date: Friday, 27 May 2016 7:45 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

Looks like it changed as a part of 
https://issues.apache.org/jira/browse/STORM-563.  That might be a good place to 
ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of 
forceFromStart, but I have not dug into the exact details of the change to know 
what all the ramifications might have been.
 - Bobby

On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:

Can you look at this please?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Date: Thursday, 26 May 2016 9:35 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

Can anyone look at this?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Date: Thursday, 26 May 2016 1:18 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>>
Subject: Storm's Kafka spout is not reading latest data even with new consumer 
group

Currently if you give the latest consumer group it starts reading data from 
earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if 
consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */  String lastInstanceId = null;

/*  99 */  Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */  if (lastTopoMeta != null)

/* 101 */lastInstanceId = (String)lastTopoMeta.get("id");

/**/  long offset;

/* 103 */  if ((_config.ignoreZkOffsets) && 
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config.startOffsetTime);

/**/  } else {

/* 106 */offset = ((Long)lastMeta.get("nextOffset")).longValue();

/**/  }

/**/} else {

/* 109 */  offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config);

/**/}

Which calls below API. As you can see this call will fetch earliest data rather 
than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config)


{


long startOffsetTime = config.startOffsetTime;


return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config) {


long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


if ( config.ignoreZkOffsets) {


startOffsetTime = config.startOffsetTime;


}


return getOffset(consumer, topic, partition, startOffsetTime);


}



This code was earlier present but somehow it got removed. I tried to search on 
github but didn't found history of change.

Thanks,

Sachin






Re: Storm's Kafka spout is not reading latest data even with new consumer group

2016-05-26 Thread Sachin Pasalkar
Can you look at this please?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Thursday, 26 May 2016 9:35 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

Can anyone look at this?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Date: Thursday, 26 May 2016 1:18 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>>
Subject: Storm's Kafka spout is not reading latest data even with new consumer 
group

Currently if you give the latest consumer group it starts reading data from 
earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if 
consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */   String lastInstanceId = null;

/*  99 */   Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */   if (lastTopoMeta != null)

/* 101 */ lastInstanceId = (String)lastTopoMeta.get("id");

/* */   long offset;

/* 103 */   if ((_config.ignoreZkOffsets) && 
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config.startOffsetTime);

/* */   } else {

/* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue();

/* */   }

/* */ } else {

/* 109 */   offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config);

/* */ }

Which calls below API. As you can see this call will fetch earliest data rather 
than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config)


{


long startOffsetTime = config.startOffsetTime;


return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config) {


long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


if ( config.ignoreZkOffsets) {


startOffsetTime = config.startOffsetTime;


}


return getOffset(consumer, topic, partition, startOffsetTime);


}



This code was earlier present but somehow it got removed. I tried to search on 
github but didn't found history of change.

Thanks,

Sachin




Re: Storm's Kafka spout is not reading latest data even with new consumer group

2016-05-26 Thread Sachin Pasalkar
Can anyone look at this?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Thursday, 26 May 2016 1:18 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>
Subject: Storm's Kafka spout is not reading latest data even with new consumer 
group

Currently if you give the latest consumer group it starts reading data from 
earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if 
consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */   String lastInstanceId = null;

/*  99 */   Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */   if (lastTopoMeta != null)

/* 101 */ lastInstanceId = (String)lastTopoMeta.get("id");

/* */   long offset;

/* 103 */   if ((_config.ignoreZkOffsets) && 
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config.startOffsetTime);

/* */   } else {

/* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue();

/* */   }

/* */ } else {

/* 109 */   offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config);

/* */ }

Which calls below API. As you can see this call will fetch earliest data rather 
than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config)


{


long startOffsetTime = config.startOffsetTime;


return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config) {


long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


if ( config.ignoreZkOffsets) {


startOffsetTime = config.startOffsetTime;


}


return getOffset(consumer, topic, partition, startOffsetTime);


}



This code was earlier present but somehow it got removed. I tried to search on 
github but didn't found history of change.

Thanks,

Sachin



Storm's Kafka spout is not reading latest data even with new consumer group

2016-05-26 Thread Sachin Pasalkar
Currently if you give the latest consumer group it starts reading data from 
earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if 
consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */   String lastInstanceId = null;

/*  99 */   Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */   if (lastTopoMeta != null)

/* 101 */ lastInstanceId = (String)lastTopoMeta.get("id");

/* */   long offset;

/* 103 */   if ((_config.ignoreZkOffsets) && 
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config.startOffsetTime);

/* */   } else {

/* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue();

/* */   }

/* */ } else {

/* 109 */   offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config);

/* */ }

Which calls below API. As you can see this call will fetch earliest data rather 
than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config)


 {


long startOffsetTime = config.startOffsetTime;


return getOffset(consumer, topic, partition, startOffsetTime);








 }



How it should be (It was there in previous release 0.9.x)

 public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config) {


long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


if ( config.ignoreZkOffsets) {


startOffsetTime = config.startOffsetTime;


}


return getOffset(consumer, topic, partition, startOffsetTime);


}



This code was earlier present but somehow it got removed. I tried to search on 
github but didn't found history of change.

Thanks,

Sachin


Re: Not enough data to calculate spout lag

2016-04-06 Thread Sachin Pasalkar
I debugged issue further, I put some logs in KafkaUtil class & found that storm 
was not receiving partition information (It was null). As partition information 
is stored in zookeeper, I decided to play with zookeepers sequence in 
properties. So I have moved first zookeeper from start & put it into end of 
list, then topology started processing data. I believe somehow the zookeeper 
did not had the partition information.

However, after this run topology worked with old config. As part of metadata 
updates storm might added information in all zookeepers. Hope this analysis is 
helpful.

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Wednesday, 6 April 2016 12:22 am
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Subject: Re: Not enough data to calculate spout lag

I checked in latest code KafkaUtils is class which is showing up this error 
(Same in previous version too)
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
Date: Wednesday, 6 April 2016 12:15 am
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>,
 Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com>>
Subject: Re: Not enough data to calculate spout lag

Sorry, I missed your mail. We are using the 0.8 version ok Kafka & 0.10 version 
of storm. Yes its Trident topologies.

From: Bobby Evans 
<ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID><mailto:ev...@yahoo-inc.com.INVALID>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>,
 Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com>>
Date: Tuesday, 22 March 2016 12:24 am
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Subject: Re: Not enough data to calculate spout lag

I am not super familiar with that code what version of the kafka spout are you 
using?  Is this with Trident or with regular storm?

The code honestly seems over complicated for what it is doing, but I would have 
to dig more deeply into exactly how the partitions are managed to possibly see 
why it is doing this.

For me though the latest code looks like there is no way what this should 
happen.
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java#L98-L104
But I didn't look a trident. - Bobby

On Sunday, March 20, 2016 7:43 AM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
 wrote:

Can someone help me out with this?

-Original Message-
From: Sachin Pasalkar [mailto:sachin_pasal...@symantec.com]
Sent: Friday, March 18, 2016 9:37 PM
To: 
dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>
Subject: Not enough data to calculate spout lag

Hi,

I found log "Metrics Tick: Not enough data to calculate spout lag." in my 
topology and then topology becomes inactive.

I check the source:
  if (_partitions != null && _partitions.size() == _partitionToOffset.size()) { 
..}else {
  LOG.info("Metrics Tick: Not enough data to calculate spout lag."); }

What situation will cause  _partitions != null or _partitions.size() 
==_partitionToOffset.size()?

Thanks,
Sachin






Re: Not enough data to calculate spout lag

2016-04-05 Thread Sachin Pasalkar
I checked in latest code KafkaUtils is class which is showing up this error 
(Same in previous version too)
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Date: Wednesday, 6 April 2016 12:15 am
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Subject: Re: Not enough data to calculate spout lag

Sorry, I missed your mail. We are using the 0.8 version ok Kafka & 0.10 version 
of storm. Yes its Trident topologies.

From: Bobby Evans 
<ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Date: Tuesday, 22 March 2016 12:24 am
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Subject: Re: Not enough data to calculate spout lag

I am not super familiar with that code what version of the kafka spout are you 
using?  Is this with Trident or with regular storm?

The code honestly seems over complicated for what it is doing, but I would have 
to dig more deeply into exactly how the partitions are managed to possibly see 
why it is doing this.

For me though the latest code looks like there is no way what this should 
happen.
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java#L98-L104
But I didn't look a trident. - Bobby

On Sunday, March 20, 2016 7:43 AM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:

Can someone help me out with this?

-Original Message-
From: Sachin Pasalkar [mailto:sachin_pasal...@symantec.com]
Sent: Friday, March 18, 2016 9:37 PM
To: dev@storm.apache.org<mailto:dev@storm.apache.org>
Subject: Not enough data to calculate spout lag

Hi,

I found log "Metrics Tick: Not enough data to calculate spout lag." in my 
topology and then topology becomes inactive.

I check the source:
  if (_partitions != null && _partitions.size() == _partitionToOffset.size()) { 
..}else {
  LOG.info("Metrics Tick: Not enough data to calculate spout lag."); }

What situation will cause  _partitions != null or _partitions.size() 
==_partitionToOffset.size()?

Thanks,
Sachin





Re: Not enough data to calculate spout lag

2016-04-05 Thread Sachin Pasalkar
Sorry, I missed your mail. We are using the 0.8 version ok Kafka & 0.10 version 
of storm. Yes its Trident topologies.

From: Bobby Evans 
<ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Date: Tuesday, 22 March 2016 12:24 am
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Subject: Re: Not enough data to calculate spout lag

I am not super familiar with that code what version of the kafka spout are you 
using?  Is this with Trident or with regular storm?

The code honestly seems over complicated for what it is doing, but I would have 
to dig more deeply into exactly how the partitions are managed to possibly see 
why it is doing this.

For me though the latest code looks like there is no way what this should 
happen.
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java#L98-L104
But I didn't look a trident. - Bobby

On Sunday, March 20, 2016 7:43 AM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:

Can someone help me out with this?

-Original Message-
From: Sachin Pasalkar [mailto:sachin_pasal...@symantec.com]
Sent: Friday, March 18, 2016 9:37 PM
To: dev@storm.apache.org<mailto:dev@storm.apache.org>
Subject: Not enough data to calculate spout lag

Hi,

I found log "Metrics Tick: Not enough data to calculate spout lag." in my 
topology and then topology becomes inactive.

I check the source:
  if (_partitions != null && _partitions.size() == _partitionToOffset.size()) { 
..}else {
  LOG.info("Metrics Tick: Not enough data to calculate spout lag."); }

What situation will cause  _partitions != null or _partitions.size() 
==_partitionToOffset.size()?

Thanks,
Sachin





Re: Provide configuration to set min fetch size in KafkaSpout

2016-04-04 Thread Sachin Pasalkar
Thanks :) Let me know in which version it will be available, we will keep our 
code till then.

From: Abhishek Agarwal <abhishc...@gmail.com<mailto:abhishc...@gmail.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Monday, 4 April 2016 8:04 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Cc: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Subject: Re: Provide configuration to set min fetch size in KafkaSpout

Seems reasonable to me. I will put up a PR for this. Though I am not sure
if it will be included as part of 1.0.0.

On Mon, Apr 4, 2016 at 7:00 PM, Sachin Pasalkar <
sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:

Hi,

I have raised below JIRA. Does this makes sense to you guys? As per our
project requirement we needed to this change in storm code.
https://issues.apache.org/jira/browse/STORM-1680

Thanks,
Sachin




--
Regards,
Abhishek Agarwal



Provide configuration to set min fetch size in KafkaSpout

2016-04-04 Thread Sachin Pasalkar
Hi,

I have raised below JIRA. Does this makes sense to you guys? As per our project 
requirement we needed to this change in storm code.
https://issues.apache.org/jira/browse/STORM-1680

Thanks,
Sachin


[jira] [Updated] (STORM-1680) Provide configuration to set min fetch size in KafkaSpout

2016-04-04 Thread Sachin Pasalkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Pasalkar updated STORM-1680:
---
Summary: Provide configuration to set min fetch size in KafkaSpout  (was: 
Provide configuration to set min fetch size in KafkaConfig)

> Provide configuration to set min fetch size in KafkaSpout
> -
>
> Key: STORM-1680
> URL: https://issues.apache.org/jira/browse/STORM-1680
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-kafka, trident
>    Reporter: Sachin Pasalkar
>
> Kafka consumer has provided the configuration to set minimum fetch size. 
> However, storms kafka spout is not exposing these functionality.  This is 
> helpful in some case where someone writing data to hdfs & want file size of 
> X. 
> Below are changes needs to be done
> 1.In KafkaUtils class update fetchMessages API with below change
> FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, 
> config.fetchSizeBytes).clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
> 2. Update KafkaConfig class with instance variable as minFetchByte 
> (Default value is 0 as mentioned in FetchRequestBuilder class)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-1680) Provide configuration to set min fetch size in KafkaConfig

2016-04-04 Thread Sachin Pasalkar (JIRA)
Sachin Pasalkar created STORM-1680:
--

 Summary: Provide configuration to set min fetch size in KafkaConfig
 Key: STORM-1680
 URL: https://issues.apache.org/jira/browse/STORM-1680
 Project: Apache Storm
  Issue Type: Improvement
  Components: storm-kafka, trident
Reporter: Sachin Pasalkar


Kafka consumer has provided the configuration to set minimum fetch size. 
However, storms kafka spout is not exposing these functionality.  This is 
helpful in some case where someone writing data to hdfs & want file size of X. 

Below are changes needs to be done
1.  In KafkaUtils class update fetchMessages API with below change
FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, 
config.fetchSizeBytes).clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
2.   Update KafkaConfig class with instance variable as minFetchByte 
(Default value is 0 as mentioned in FetchRequestBuilder class)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: Not enough data to calculate spout lag

2016-03-20 Thread Sachin Pasalkar
Can someone help me out with this?

-Original Message-
From: Sachin Pasalkar [mailto:sachin_pasal...@symantec.com] 
Sent: Friday, March 18, 2016 9:37 PM
To: dev@storm.apache.org
Subject: Not enough data to calculate spout lag

Hi,

I found log "Metrics Tick: Not enough data to calculate spout lag." in my 
topology and then topology becomes inactive.

I check the source:
  if (_partitions != null && _partitions.size() == _partitionToOffset.size()) { 
..}else {
   LOG.info("Metrics Tick: Not enough data to calculate spout lag."); }

What situation will cause  _partitions != null or _partitions.size() 
==_partitionToOffset.size()?

Thanks,
Sachin



Not enough data to calculate spout lag

2016-03-18 Thread Sachin Pasalkar
Hi,

I found log "Metrics Tick: Not enough data to calculate spout lag.” in my 
topology and then topology becomes inactive.

I check the source:
  if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
……}else {
   LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
}

What situation will cause  _partitions != null or _partitions.size() 
==_partitionToOffset.size()?

Thanks,
Sachin



Re: Adding more constructors in Tuple class

2016-01-05 Thread Sachin Pasalkar
Does this makes sense?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Monday, 4 January 2016 10:27 am
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Subject: RE: Adding more constructors in Tuple class

First one will help in terms of performance, I might emit the 100 values and 
arraylist default size is 10. So each time I hits limit it will end up 
increasing size of internal array by 1 1/2. This will result in memory 
reallocations.

For case of set I got your point. So what we want to do is we have fixed 40 
fields which needs to be passed and there are some try blocks where we may not 
able to add values to it (However, we can add null value in catch block but 
that's doesn’t look feasible as we have to write 35 times same code). For now 
we are running our own loop for add 40 null values initially then set them as 
required. We can downcast it to ArrayList implementation where I can pass 
ArrayList with null Values directly (I am just trying to avoid any manual code 
i.e. running for loop for 40 fields).
e.g.
ArrayList list= new ArrayList<>(Collections.nCopies(40, null));
Values val=new Values(list);


-Original Message-
From: Nathan Leung [mailto:ncle...@gmail.com]
Sent: Saturday, January 02, 2016 9:11 PM
To: dev@storm.apache.org<mailto:dev@storm.apache.org>
Subject: Re: Adding more constructors in Tuple class

Is there a use case that requires the first? I thought most people use Values 
which has a variadic constructor so you know the length already anyways.

The second doesn't map cleanly. If you pass a set, how do you order the 
elements in the tuple?  You can just pass your collection as a field in the 
tuple if that's what you need to do.
On Jan 2, 2016 1:44 AM, "Sachin Pasalkar" 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
wrote:

Hi,

Can we add some more constructor in backtype.storm.tuple.Values class
which are already present in ArrayList class. Some examples are below


public Values(int initialCapacity) {

super(initialCapacity);

}

This will help us in some performance management if we know what size
the list going to be.


public Values(Collection c) {

super(c);

}


This will allow us the addling predefined list to values instead of
manually iterating over it. We cannot use Values(Object... vals) as it
treat list of element as one element


Thanks,

Sachin





RE: Adding more constructors in Tuple class

2016-01-03 Thread Sachin Pasalkar
Hey Matthias,

If you look at implementation of ensureCapacity it reallocates to new internal 
array, whereas arraylist(int) initially creates new array upfront. Though this 
is not much difference if you use it initially, but arraylist(int) is cleaner 
solution for this. 

-Original Message-
From: Matthias J. Sax [mailto:mj...@apache.org] 
Sent: Sunday, January 03, 2016 12:48 AM
To: dev@storm.apache.org
Subject: Re: Adding more constructors in Tuple class

Hi,

as Values inherits from ArrayList, you can also use .ensureCapacity(int).

-Matthias

On 01/02/2016 04:41 PM, Nathan Leung wrote:
> Is there a use case that requires the first? I thought most people use 
> Values which has a variadic constructor so you know the length already 
> anyways.
> 
> The second doesn't map cleanly. If you pass a set, how do you order 
> the elements in the tuple?  You can just pass your collection as a 
> field in the tuple if that's what you need to do.
> On Jan 2, 2016 1:44 AM, "Sachin Pasalkar" 
> <sachin_pasal...@symantec.com>
> wrote:
> 
>> Hi,
>>
>> Can we add some more constructor in backtype.storm.tuple.Values class 
>> which are already present in ArrayList class. Some examples are below
>>
>>
>> public Values(int initialCapacity) {
>>
>> super(initialCapacity);
>>
>> }
>>
>> This will help us in some performance management if we know what size 
>> the list going to be.
>>
>>
>> public Values(Collection c) {
>>
>> super(c);
>>
>> }
>>
>>
>> This will allow us the addling predefined list to values instead of 
>> manually iterating over it. We cannot use Values(Object... vals) as 
>> it treat list of element as one element
>>
>>
>> Thanks,
>>
>> Sachin
>>
>>
> 



Adding more constructors in Tuple class

2016-01-01 Thread Sachin Pasalkar
Hi,

Can we add some more constructor in backtype.storm.tuple.Values class which are 
already present in ArrayList class. Some examples are below


public Values(int initialCapacity) {

super(initialCapacity);

}

This will help us in some performance management if we know what size the list 
going to be.


public Values(Collection c) {

super(c);

}


This will allow us the addling predefined list to values instead of manually 
iterating over it. We cannot use Values(Object... vals) as it treat list of 
element as one element


Thanks,

Sachin



[jira] [Updated] (STORM-1363) TridentKafkaState should handle null values from TridentTupleToKafkaMapper.getMessageFromTuple

2015-12-02 Thread Sachin Pasalkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Pasalkar updated STORM-1363:
---
Description: 
If you look at the updateState API of storm.kafka.trident.TridentKafkaState. 
When producer is sending data its not handling if the null value is sent by 
mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as 
"null" string. There might be case in particular kind of exception user do not 
want to replay tuple and just report it and with that he needs to return null.

Also make the members as protected as I need to copy-paste the class to provide 
my implementation.

My updateState API looks like this

{code}
public void updateState(List tuples, TridentCollector collector) {
  String topic = null;
for (TridentTuple tuple : tuples) {
if(tuple==null) {
continue;
}

Object keyFromTuple = null;
try {
keyFromTuple = mapper.getKeyFromTuple(tuple);
topic = topicSelector.getTopic(tuple);
Object messageFromTuple = 
mapper.getMessageFromTuple(tuple);
if (topic != null && messageFromTuple != null) {
producer.send(new KeyedMessage(topic, 
keyFromTuple, messageFromTuple));
} else {
LOG.warn("skipping key = " + 
keyFromTuple + ", topic selector returned null.");
}
} catch (Exception ex) {
String errorMsg = "Could not send message with 
key = " + keyFromTuple + " to topic = " + topic;
LOG.warn(errorMsg, ex);
throw new FailedException(errorMsg, ex);
}
}
}
{code}

  was:
If you look at the updateState API of storm.kafka.trident.TridentKafkaState. 
When producer is sending data its not handling if the null value is sent by 
mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as 
"null" string. There might be case in particular kind of exception user do not 
want to replay tuple and just report it and with that he needs to return null.

Also make the members as protected as I need to copy-paste the class to provide 
my implementation.

My updateState API looks like this

{code}
public void updateState(List tuples, 
TridentCollector collector) {
String topic = null;
for (TridentTuple tuple : tuples) {
if(tuple==null) {
continue;
}

Object keyFromTuple = null;
try {
keyFromTuple = mapper.getKeyFromTuple(tuple);
topic = topicSelector.getTopic(tuple);
Object messageFromTuple = 
mapper.getMessageFromTuple(tuple);
if (topic != null && messageFromTuple != null) {
producer.send(new KeyedMessage(topic, 
keyFromTuple, messageFromTuple));
} else {
LOG.warn("skipping key = " + 
keyFromTuple + ", topic selector returned null.");
}
} catch (Exception ex) {
String errorMsg = "Could not send message with 
key = " + keyFromTuple + " to topic = " + topic;
LOG.warn(errorMsg, ex);
throw new FailedException(errorMsg, ex);
}
}
}
{code}


> TridentKafkaState should handle null values from 
> TridentTupleToKafkaMapper.getMessageFromTuple
> --
>
> Key: STORM-1363
> URL: https://issues.apache.org/jira/browse/STORM-1363
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 0.10.1
>Reporter: Sachin Pasalkar
>
> If you look at the updateState API of storm.kafka.trident.TridentKafkaState. 
> When producer is sending data its not handling if the null value is sent by 
> mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as 
> "null" string. There might be case in particular kind of exception user do 
> not want to repla

[jira] [Updated] (STORM-1363) TridentKafkaState should handle null values from TridentTupleToKafkaMapper.getMessageFromTuple

2015-12-02 Thread Sachin Pasalkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Pasalkar updated STORM-1363:
---
Description: 
If you look at the updateState API of storm.kafka.trident.TridentKafkaState. 
When producer is sending data its not handling if the null value is sent by 
mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as 
"null" string. There might be case in particular kind of exception user do not 
want to replay tuple and just report it and with that he needs to return null.

Also make the members as protected as I need to copy-paste the class to provide 
my implementation.

My updateState API looks like this

{code}
public void updateState(List tuples, 
TridentCollector collector) {
String topic = null;
for (TridentTuple tuple : tuples) {
if(tuple==null) {
continue;
}

Object keyFromTuple = null;
try {
keyFromTuple = mapper.getKeyFromTuple(tuple);
topic = topicSelector.getTopic(tuple);
Object messageFromTuple = 
mapper.getMessageFromTuple(tuple);
if (topic != null && messageFromTuple != null) {
producer.send(new KeyedMessage(topic, 
keyFromTuple, messageFromTuple));
} else {
LOG.warn("skipping key = " + 
keyFromTuple + ", topic selector returned null.");
}
} catch (Exception ex) {
String errorMsg = "Could not send message with 
key = " + keyFromTuple + " to topic = " + topic;
LOG.warn(errorMsg, ex);
throw new FailedException(errorMsg, ex);
}
}
}
{code}

  was:
If you look at the updateState API of storm.kafka.trident.TridentKafkaState. 
When producer is sending data its not handling if the null value is sent by 
mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as 
"null" string. There might be case in particular kind of exception user do not 
want to replay tuple and just report it and with that he needs to return null.

Also make the members as protected as I need to copy-paste the class to provide 
my implementation.

My updateState API looks like this

public void updateState(List tuples, 
TridentCollector collector) {
String topic = null;
for (TridentTuple tuple : tuples) {
if(tuple==null) {
continue;
}

Object keyFromTuple = null;
try {
keyFromTuple = mapper.getKeyFromTuple(tuple);
topic = topicSelector.getTopic(tuple);
Object messageFromTuple = 
mapper.getMessageFromTuple(tuple);
if (topic != null && messageFromTuple != null) {
producer.send(new KeyedMessage(topic, 
keyFromTuple, messageFromTuple));
} else {
LOG.warn("skipping key = " + 
keyFromTuple + ", topic selector returned null.");
}
} catch (Exception ex) {
String errorMsg = "Could not send message with 
key = " + keyFromTuple + " to topic = " + topic;
LOG.warn(errorMsg, ex);
throw new FailedException(errorMsg, ex);
}
}
}



> TridentKafkaState should handle null values from 
> TridentTupleToKafkaMapper.getMessageFromTuple
> --
>
> Key: STORM-1363
> URL: https://issues.apache.org/jira/browse/STORM-1363
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 0.10.1
>Reporter: Sachin Pasalkar
>
> If you look at the updateState API of storm.kafka.trident.TridentKafkaState. 
> When producer is sending data its not handling if the null value is sent by 
> mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as 
> "null" string. There might be case in particular kind of exception user do 
> not want to

[jira] [Created] (STORM-1363) TridentKafkaState should handle null values from TridentTupleToKafkaMapper.getMessageFromTuple

2015-12-02 Thread Sachin Pasalkar (JIRA)
Sachin Pasalkar created STORM-1363:
--

 Summary: TridentKafkaState should handle null values from 
TridentTupleToKafkaMapper.getMessageFromTuple
 Key: STORM-1363
 URL: https://issues.apache.org/jira/browse/STORM-1363
 Project: Apache Storm
  Issue Type: Bug
  Components: storm-kafka
Affects Versions: 0.10.1
Reporter: Sachin Pasalkar


If you look at the updateState API of storm.kafka.trident.TridentKafkaState. 
When producer is sending data its not handling if the null value is sent by 
mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as 
"null" string. There might be case in particular kind of exception user do not 
want to replay tuple and just report it and with that he needs to return null.

Also make the members as protected as I need to copy-paste the class to provide 
my implementation.

My updateState API looks like this

public void updateState(List tuples, 
TridentCollector collector) {
String topic = null;
for (TridentTuple tuple : tuples) {
if(tuple==null) {
continue;
}

Object keyFromTuple = null;
try {
keyFromTuple = mapper.getKeyFromTuple(tuple);
topic = topicSelector.getTopic(tuple);
Object messageFromTuple = 
mapper.getMessageFromTuple(tuple);
if (topic != null && messageFromTuple != null) {
producer.send(new KeyedMessage(topic, 
keyFromTuple, messageFromTuple));
} else {
LOG.warn("skipping key = " + 
keyFromTuple + ", topic selector returned null.");
}
} catch (Exception ex) {
String errorMsg = "Could not send message with 
key = " + keyFromTuple + " to topic = " + topic;
LOG.warn(errorMsg, ex);
throw new FailedException(errorMsg, ex);
}
}
}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-1343) Add a matrices for Trident which actually counts number of messages processed.

2015-11-23 Thread Sachin Pasalkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Pasalkar updated STORM-1343:
---
Priority: Major  (was: Critical)

> Add a matrices for Trident which actually counts number of messages processed.
> --
>
> Key: STORM-1343
> URL: https://issues.apache.org/jira/browse/STORM-1343
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Affects Versions: 0.10.0
>    Reporter: Sachin Pasalkar
>
> If we keep trident topology running without pumping any messages in Kafka. 
> Storm UI still show increased count. After some investigation we found one 
> blog mentioning that its not actual processed message count  
> https://github.com/miguno/kafka-storm-starter/issues/5
> As user, its very confusing what is this count and some time it gets 
> misinterpreted. We have seen core storm is showing the count correctly. As 
> trident is abstract level of core storm can't we use those matrices? 
> We also found a blog where user can manually add the matrices to his code 
> which will come up to the Storm UI. 
> http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to 
> Can we implement processed tuple matrices in trident directly? That will 
> helful to end user in understanding what is topology doing & what needs to 
> done to increase EPS of particular component. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Trident topology showing improper values in UI

2015-11-23 Thread Sachin Pasalkar
Thanks :) Will do.

From: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Reply-To: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Date: Monday, 23 November 2015 11:34 pm
To: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>, 
"dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Subject: Re: Trident topology showing improper values in UI

File a JIRA in issues.apache.org/jira under STORM describing what you want to 
see with storm and the UI especially for trident.  We are in the process of 
merging with the JStorm project and they have completely redone how metrics 
work, so with their new code it might be more doable.

- Bobby



On Monday, November 23, 2015 10:52 AM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:


I also found that we can configure our own matrices to check how many tuples 
are processed
 http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to

Can’t we have this as part of trident doing internally?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Date: Monday, 23 November 2015 10:14 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Subject: Re: Trident topology showing improper values in UI

Thanks Bobby for reply. However, I am aware of these things but my question 
different, when topology is not processing message still count on UI increases 
(Both on topology page/component  page).  Also, I have hardly seen the expected 
count of message processed on component page.

From: Bobby Evans 
<ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Date: Monday, 23 November 2015 8:17 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Subject: Re: Trident topology showing improper values in UI

The metrics are not always the most clear on the UI, and trident does not make 
it any clearer. First of all there are several different modes on the UI.  You 
can look at the metrics over different time periods, 10 mins, 3 hours, 1 day, 
and all time.  If you have clicked on one of these links the page will display 
numbers for just this range, except in the section labeled for specific time 
periods.  There is also a button that decides if we should include system stats 
or not.  If this is selected the aggregate numbers will include acks.  If it is 
not selected they will not.

The other thing to be aware of is that trident is a layer that sits on top of 
storm so the UI displays what the trident topology was compiled down to, not as 
much of what your code looks like.  Trident spouts actually run in a bolt, but 
control logic is in the actual spout.  You can still pull out the number of 
messages that a trident spout emitted in most cases, but you need to know where 
to look, and you might need to click on the component page for the bolt running 
the spout to be able to divide the counts up by which stream they were written 
to. - Bobby


On Monday, November 23, 2015 7:38 AM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:

Hi,

We are developing the trident topology, we observed that the acking in UI 
increase continuously even though we do not process any message. I found 
description what those acks are 
https://github.com/miguno/kafka-storm-starter/issues/5. However, is there any 
plan to show users processed message? As a user, I am not interested in seeing 
these acks. I am very much interesting in acks for my messages.

Thanks,
Sachin






[jira] [Created] (STORM-1343) Add a matrices for Trident which actually counts number of messages processed.

2015-11-23 Thread Sachin Pasalkar (JIRA)
Sachin Pasalkar created STORM-1343:
--

 Summary: Add a matrices for Trident which actually counts number 
of messages processed.
 Key: STORM-1343
 URL: https://issues.apache.org/jira/browse/STORM-1343
 Project: Apache Storm
  Issue Type: Improvement
  Components: storm-core
Affects Versions: 0.10.0
Reporter: Sachin Pasalkar
Priority: Critical


If we keep trident topology running without pumping any messages in Kafka. 
Storm UI still show increased count. After some investigation we found one blog 
mentioning that its not actual processed message count  
https://github.com/miguno/kafka-storm-starter/issues/5

As user, its very confusing what is this count and some time it gets 
misinterpreted. We have seen core storm is showing the count correctly. As 
trident is abstract level of core storm can't we use those matrices? 

We also found a blog where user can manually add the matrices to his code which 
will come up to the Storm UI. 
http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to 

Can we implement processed tuple matrices in trident directly? That will helful 
to end user in understanding what is topology doing & what needs to done to 
increase EPS of particular component. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-1343) Add a matrices for Trident which actually counts number of messages processed.

2015-11-23 Thread Sachin Pasalkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Pasalkar updated STORM-1343:
---
Attachment: Re  Trident topology showing improper values in UI.msg

Discussion with Bobby Evans in attached mail

> Add a matrices for Trident which actually counts number of messages processed.
> --
>
> Key: STORM-1343
> URL: https://issues.apache.org/jira/browse/STORM-1343
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Affects Versions: 0.10.0
>    Reporter: Sachin Pasalkar
> Attachments: Re  Trident topology showing improper values in UI.msg
>
>
> If we keep trident topology running without pumping any messages in Kafka. 
> Storm UI still show increased count. After some investigation we found one 
> blog mentioning that its not actual processed message count  
> https://github.com/miguno/kafka-storm-starter/issues/5
> As user, its very confusing what is this count and some time it gets 
> misinterpreted. We have seen core storm is showing the count correctly. As 
> trident is abstract level of core storm can't we use those matrices? 
> We also found a blog where user can manually add the matrices to his code 
> which will come up to the Storm UI. 
> http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to 
> Can we implement processed tuple matrices in trident directly? That will 
> helful to end user in understanding what is topology doing & what needs to 
> done to increase EPS of particular component. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Trident topology showing improper values in UI

2015-11-23 Thread Sachin Pasalkar
I also found that we can configure our own matrices to check how many tuples 
are processed
 http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to

Can’t we have this as part of trident doing internally?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Date: Monday, 23 November 2015 10:14 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Subject: Re: Trident topology showing improper values in UI

Thanks Bobby for reply. However, I am aware of these things but my question 
different, when topology is not processing message still count on UI increases 
(Both on topology page/component  page).  Also, I have hardly seen the expected 
count of message processed on component page.

From: Bobby Evans 
<ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Date: Monday, 23 November 2015 8:17 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Subject: Re: Trident topology showing improper values in UI

The metrics are not always the most clear on the UI, and trident does not make 
it any clearer. First of all there are several different modes on the UI.  You 
can look at the metrics over different time periods, 10 mins, 3 hours, 1 day, 
and all time.  If you have clicked on one of these links the page will display 
numbers for just this range, except in the section labeled for specific time 
periods.  There is also a button that decides if we should include system stats 
or not.  If this is selected the aggregate numbers will include acks.  If it is 
not selected they will not.

The other thing to be aware of is that trident is a layer that sits on top of 
storm so the UI displays what the trident topology was compiled down to, not as 
much of what your code looks like.  Trident spouts actually run in a bolt, but 
control logic is in the actual spout.  You can still pull out the number of 
messages that a trident spout emitted in most cases, but you need to know where 
to look, and you might need to click on the component page for the bolt running 
the spout to be able to divide the counts up by which stream they were written 
to. - Bobby


On Monday, November 23, 2015 7:38 AM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:

Hi,

We are developing the trident topology, we observed that the acking in UI 
increase continuously even though we do not process any message. I found 
description what those acks are 
https://github.com/miguno/kafka-storm-starter/issues/5. However, is there any 
plan to show users processed message? As a user, I am not interested in seeing 
these acks. I am very much interesting in acks for my messages.

Thanks,
Sachin




Re: Trident topology showing improper values in UI

2015-11-23 Thread Sachin Pasalkar
Thanks Bobby for reply. However, I am aware of these things but my question 
different, when topology is not processing message still count on UI increases 
(Both on topology page/component  page).  Also, I have hardly seen the expected 
count of message processed on component page.

From: Bobby Evans 
<ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Date: Monday, 23 November 2015 8:17 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Subject: Re: Trident topology showing improper values in UI

The metrics are not always the most clear on the UI, and trident does not make 
it any clearer. First of all there are several different modes on the UI.  You 
can look at the metrics over different time periods, 10 mins, 3 hours, 1 day, 
and all time.  If you have clicked on one of these links the page will display 
numbers for just this range, except in the section labeled for specific time 
periods.  There is also a button that decides if we should include system stats 
or not.  If this is selected the aggregate numbers will include acks.  If it is 
not selected they will not.

The other thing to be aware of is that trident is a layer that sits on top of 
storm so the UI displays what the trident topology was compiled down to, not as 
much of what your code looks like.  Trident spouts actually run in a bolt, but 
control logic is in the actual spout.  You can still pull out the number of 
messages that a trident spout emitted in most cases, but you need to know where 
to look, and you might need to click on the component page for the bolt running 
the spout to be able to divide the counts up by which stream they were written 
to. - Bobby


On Monday, November 23, 2015 7:38 AM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:

Hi,

We are developing the trident topology, we observed that the acking in UI 
increase continuously even though we do not process any message. I found 
description what those acks are 
https://github.com/miguno/kafka-storm-starter/issues/5. However, is there any 
plan to show users processed message? As a user, I am not interested in seeing 
these acks. I am very much interesting in acks for my messages.

Thanks,
Sachin




Re: Why am I getting OffsetOutOfRange: Updating offset from offset?

2015-11-22 Thread Sachin Pasalkar
Can someone help us on this?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Friday, 20 November 2015 11:53 am
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Subject: Why am I getting OffsetOutOfRange: Updating offset from offset?

Hi,

We are developing application where after some days of topology run, we get 
continuous warning messages


2015-11-20 05:05:42.226 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7238824446]

2015-11-20 05:05:42.229 s.k.t.TridentKafkaEmitter [WARN] OffsetOutOfRange: 
Updating offset from offset = 7238824446 to offset = 7241183683

2015-11-20 05:05:43.207 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7022945051]

2015-11-20 05:05:43.208 s.k.t.TridentKafkaEmitter [WARN] OffsetOutOfRange: 
Updating offset from offset = 7022945051 to offset = 7025309343

2015-11-20 05:05:44.260 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7170559432]

2015-11-20 05:05:44.264 s.k.t.TridentKafkaEmitter [WARN] OffsetOutOfRange: 
Updating offset from offset = 7170559432 to offset = 7172920769

2015-11-20 05:05:45.332 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7132495867]……


After some point topology stop processing messages, I need to rebalance it to 
start it again.


My spout config is


BrokerHosts brokers = new ZkHosts((String) 
stormConfiguration.get(ZOOKEEPER_HOSTS));

TridentKafkaConfig spoutConfig = new TridentKafkaConfig(brokers, (String) 
stormConfiguration.get(KAFKA_INPUT_TOPIC));


spoutConfig.scheme = getSpoutScheme(stormConfiguration);

Boolean forceFromStart = (Boolean) stormConfiguration.get(FORCE_FROM_START);


spoutConfig.ignoreZkOffsets = false;

spoutConfig.fetchSizeBytes = 
stormConfiguration.getIntProperty(KAFKA_CONSUMER_FETCH_SIZE_BYTE, 
KAFKA_CONSUMER_DEFAULT_FETCH_SIZE_BYTE);

spoutConfig.bufferSizeBytes = 
stormConfiguration.getIntProperty(KAFKA_CONSUMER_BUFFER_SIZE_BYTE, 
KAFKA_CONSUMER_DEFAULT_BUFFER_SIZE_BYTE);

As per my knowledge, only thing we are doing wrong is topic has 12 partitions 
but we are reading using only 1 spout, but that’s limitation on our side. I am 
not sure why its getting halted? It just keep printing below lines and does 
nothing


2015-11-20 05:44:41.574 b.s.m.n.Server [INFO] Getting metrics for server on 
port 6700

2015-11-20 05:44:41.574 b.s.m.n.Client [INFO] Getting metrics for client 
connection to Netty-Client-b-bdata-xx.net/xxx.xx.xxx.xxx:6700

2015-11-20 05:44:41.574 b.s.m.n.Client [INFO] Getting metrics for client 
connection to Netty-Client-b-bdata-xx.net/xxx.xx.xxx.xxx:6709

2015-11-20 05:44:41.574 b.s.m.n.Client [INFO] Getting metrics for client 
connection to Netty-Client-b-bdata-xx.net/xxx.xx.xxx.xxx:6707


Thanks,

Sachin



Why am I getting OffsetOutOfRange: Updating offset from offset?

2015-11-19 Thread Sachin Pasalkar
Hi,

We are developing application where after some days of topology run, we get 
continuous warning messages


2015-11-20 05:05:42.226 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7238824446]

2015-11-20 05:05:42.229 s.k.t.TridentKafkaEmitter [WARN] OffsetOutOfRange: 
Updating offset from offset = 7238824446 to offset = 7241183683

2015-11-20 05:05:43.207 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7022945051]

2015-11-20 05:05:43.208 s.k.t.TridentKafkaEmitter [WARN] OffsetOutOfRange: 
Updating offset from offset = 7022945051 to offset = 7025309343

2015-11-20 05:05:44.260 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7170559432]

2015-11-20 05:05:44.264 s.k.t.TridentKafkaEmitter [WARN] OffsetOutOfRange: 
Updating offset from offset = 7170559432 to offset = 7172920769

2015-11-20 05:05:45.332 s.k.KafkaUtils [WARN] Got fetch request with offset out 
of range: [7132495867]……


After some point topology stop processing messages, I need to rebalance it to 
start it again.


My spout config is


BrokerHosts brokers = new ZkHosts((String) 
stormConfiguration.get(ZOOKEEPER_HOSTS));

TridentKafkaConfig spoutConfig = new TridentKafkaConfig(brokers, (String) 
stormConfiguration.get(KAFKA_INPUT_TOPIC));


spoutConfig.scheme = getSpoutScheme(stormConfiguration);

Boolean forceFromStart = (Boolean) stormConfiguration.get(FORCE_FROM_START);


spoutConfig.ignoreZkOffsets = false;

spoutConfig.fetchSizeBytes = 
stormConfiguration.getIntProperty(KAFKA_CONSUMER_FETCH_SIZE_BYTE, 
KAFKA_CONSUMER_DEFAULT_FETCH_SIZE_BYTE);

spoutConfig.bufferSizeBytes = 
stormConfiguration.getIntProperty(KAFKA_CONSUMER_BUFFER_SIZE_BYTE, 
KAFKA_CONSUMER_DEFAULT_BUFFER_SIZE_BYTE);

As per my knowledge, only thing we are doing wrong is topic has 12 partitions 
but we are reading using only 1 spout, but that’s limitation on our side. I am 
not sure why its getting halted? It just keep printing below lines and does 
nothing


2015-11-20 05:44:41.574 b.s.m.n.Server [INFO] Getting metrics for server on 
port 6700

2015-11-20 05:44:41.574 b.s.m.n.Client [INFO] Getting metrics for client 
connection to Netty-Client-b-bdata-xx.net/xxx.xx.xxx.xxx:6700

2015-11-20 05:44:41.574 b.s.m.n.Client [INFO] Getting metrics for client 
connection to Netty-Client-b-bdata-xx.net/xxx.xx.xxx.xxx:6709

2015-11-20 05:44:41.574 b.s.m.n.Client [INFO] Getting metrics for client 
connection to Netty-Client-b-bdata-xx.net/xxx.xx.xxx.xxx:6707


Thanks,

Sachin


[jira] [Commented] (STORM-1006) Storm is not garbage collecting the messages (causing memory hit)

2015-11-06 Thread Sachin Pasalkar (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14993547#comment-14993547
 ] 

Sachin Pasalkar commented on STORM-1006:


Now we are no more using CoordinatedBolt, but I didn't understand why any user 
should care about setting up the unique id. Shouldn't it be take care by 
system. Don't you think get(index) is very risky call?

> Storm is not garbage collecting the messages (causing memory hit)
> -
>
> Key: STORM-1006
> URL: https://issues.apache.org/jira/browse/STORM-1006
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-kafka
>Affects Versions: 0.9.3
>    Reporter: Sachin Pasalkar
>
> We are reading whole file in memory around 5 MB, which is send through Kafaka 
> to Storm. In next bolt, we performs the operation on file and sends out tuple 
> to next bolt. After profiling we found that file (bytes of file) does not get 
> garbage collected. So after further investigation we found that  
> backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
>  Collection, List) API gets the first object and use it for 
> tracking :(. Can you confirm reason behind this? Is there any way we can send 
> different unique id as first element in list or the unique id of tuple used 
> as indicator.
> However, for time being we have made changes in schema assigned to 
> KafkaSpout, so that it will parse the file and send out list of values.
> If you below code CoordinatedBolt, "Object id = tuple.getValue(0);” takes the 
> 1st element from tuple instead of taking id of tuple. This "id" is then saved 
> to _tracked hashhMap(TimeCache). In our case the 0th element is files byte 
> data. This gets stored in the _tracked map till tree of tuple doesn’t get 
> complete. As we are processing huge data we run outofMemory issue.
> Code:
> public void execute(Tuple tuple) {
> *Object id = tuple.getValue(0);*
> TrackingInfo track;
> TupleType type = getTupleType(tuple);
> synchronized(_tracked) {
> track = _tracked.get(id);
> if(track==null) {
> track = new TrackingInfo();
> if(_idStreamSpec==null) track.receivedId = true;
> _tracked.put(id, track);*
> }
> }
> if(type==TupleType.ID) {
> synchronized(_tracked) {
> track.receivedId = true;
> }
> checkFinishId(tuple, type);
> } else if(type==TupleType.COORD) {
> int count = (Integer) tuple.getValue(1);
> synchronized(_tracked) {
> track.reportCount++;
> track.expectedTupleCount+=count;
> }
> checkFinishId(tuple, type);
> } else {
> synchronized(_tracked) {
> _delegate.execute(tuple);
> }
> }
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Emitting Custom Object as Tuple from spout

2015-09-18 Thread Sachin Pasalkar
You can implement your own backtype.storm.spout.MultiScheme where in 
backtype.storm.spout.MultiScheme.deserialize(byte[]) allows you to convert your 
data in required object. However you also need to register the class for 
serialisation using backtype.storm.Config.registerSerialization(Class) which 
will dynamically serialize your class.

To set schema to spout spoutConfig.scheme = new MultiSchemeImpl();


From: Ankur Garg >
Reply-To: "dev@storm.apache.org" 
>
Date: Wednesday, 16 September 2015 7:07 pm
To: "u...@storm.apache.org" 
>, 
"dev@storm.apache.org" 
>
Subject: Emitting Custom Object as Tuple from spout

Hi ,

I am new to apache Storm . To understand it I was looking at storm examples
provided in the storm tutorial (
https://github.com/apache/storm/tree/master/examples/storm-starter)  .


In all the examples , we are emitting prmitive types (String,int etc) as
Tuples .

In my use case I am having a Java Object which i want to emit as tuple to
bolts .

I am not sure how to do it . It seems I have to implement a custom Tuple
producer to convert java Object to Values .

Can anyone provide me some example how to do that :

For ex my Java Object is :

class Bean
{
int A ;
String B;
 Bean2 b;
   //setters and getters

}

and Class Bean2
{
   //Some Attributes
}

Now , in my nextTuple() method for Spout , I have an instance of Bean
object .

How do I translate into Tuple and emit it and consume it through my bolt.

Any ideas please.

Thanks
ankur



Possible issue with SequenceFileBolt

2015-09-16 Thread Sachin Pasalkar
When you use the compressionType as BLOCK it does not rotate the file at 
specified file size.

On further investigation I found that Block writer sync only when buffered data 
reached compressionBlockSize.
Code in org.apache.hadoop.io.SequenceFile.BlockCompressWriter.append(Object, 
Object)

 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
if(currentBlockSize >= compressionBlockSize)
sync();
Default value of compressionBlockSize is 100, which make
offset = this.writer.getLength(); (SequenceFileBolt class) to returns stall 
value of current data size.

Incase or org.apache.hadoop.io.SequenceFile.RecordCompressWriter it works fine 
as it always call for sync api (checkAndWriteSync())

Either we can sync records every time it comes or use  int currentBlockSize = 
keyBuffer.getLength() + valBuffer.getLength(); to return current size.

Thanks,
Sachin


RE: How to limit the spout not to generate the tuple in case of failure of downstream tuple?

2015-09-13 Thread Sachin Pasalkar
Thanks for reply, that’s how we exactly implement using own unique key. I was 
just looking for any option provided by kafka spout to drop tuple after certain 
attempt. Don’t you think that will be wiser option, rather everyone code for it 
:).

Even for trident for sure tuple is going to fail in some bolt so trident will 
also try it continuously reply tuple unless one of them is processed. Is my 
observation correct? 

-Original Message-
From: Venkat Gmail [mailto:mvr...@gmail.com] 
Sent: Saturday, September 12, 2015 10:28 PM
To: dev@storm.apache.org
Cc: Venkat Mantirraju
Subject: Re: How to limit the spout not to generate the tuple in case of 
failure of downstream tuple?

Not sure whether you received my answer. Hence resenting. 

Thanks,
Venkat

> On Sep 12, 2015, at 9:52 AM, Venkat Gmail <mvr...@gmail.com> wrote:
> 
> Here are bunch of solutions: 
> Don't drop it but ack it always. 
> 
> 1) try use exactly once topology like Trident. 
> 2) if you want to use Regular storm topology which guarantees at least once 
> (>=1): 
> A) override Kafka spout and handle fail differently Or
> B) at your bolt,
> Just store unique message id when you put in Kafka in to a store. These 
> message ids can be stored in redis store. If message id already exists ( 
> processed already), then ack it. Otherwise process it and ack it. In this way 
> you will achieve 100% accuracy with regular storm topology without trident. 
> For the message id: you can use already existing Kafka message id or create a 
> unique guid for each message prior to putting in Kafka at the first time and 
> use the same id. 
> Redis store: maintain a small foot print redis store to maintain a  ttl of 4 
> hours for each message id. I prefer 2nd one. 
> 
> Let me know if you have any questions and I will be glad to assist. 
> Thanks,
> Venkat
> 
>> On Sep 12, 2015, at 7:56 AM, Nathan Leung <ncle...@gmail.com> wrote:
>> 
>> Don't fail the tuple, just drop it (don't emit). Btw the user list is 
>> better for this type of question.
>> On Sep 12, 2015 7:43 AM, "Sachin Pasalkar" 
>> <sachin_pasal...@symantec.com>
>> wrote:
>> 
>>> Hi,
>>> 
>>> As per my knowledge  storm is follow "at least one” way , which 
>>> means it will make sure at least once tuple gets fully processed. So 
>>> my question is, if I have received some unexpected data, certain 
>>> bolt in my topology will start them failing. The spout will get the 
>>> failure notification from acker thread and will resend them. However 
>>> as I know its always going to fail, is there any way I can ask spout 
>>> to stop generation of spout after X number of attempts?
>>> 
>>> Thanks,
>>> Sachin
>>> 


How to limit the spout not to generate the tuple in case of failure of downstream tuple?

2015-09-12 Thread Sachin Pasalkar
Hi,

As per my knowledge  storm is follow "at least one” way , which means it will 
make sure at least once tuple gets fully processed. So my question is, if I 
have received some unexpected data, certain bolt in my topology will start them 
failing. The spout will get the failure notification from acker thread and will 
resend them. However as I know its always going to fail, is there any way I can 
ask spout to stop generation of spout after X number of attempts?

Thanks,
Sachin


Re: How to rotate file incase of storm worker fails?

2015-09-10 Thread Sachin Pasalkar
I don’t see why it can’t be happen though it uses the tick tuple, what if the 
worker die in between? This file will never get rotate to final destination

From: Arun Iyer <ai...@hortonworks.com<mailto:ai...@hortonworks.com>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Date: Thursday, 10 September 2015 12:35 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Subject: Re: How to rotate file incase of storm worker fails?

Sachin,

STORM-969 makes use of tick tuple to periodically ack and flush the tuples so 
the scenario you mentioned would not happen. The tickTupleInterval is 
configurable.

- Arun




On 9/9/15, 11:47 PM, "Sachin Pasalkar" 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:

Hi,

I was looking at code where HDFSBolt writes the file with tuple coming to it. I 
also had a look  at JIRA 
STORM-969<https://issues.apache.org/jira/browse/STORM-969>, I have following 
question to it

1)Lets say I have setup fileRotation policy at 64 MB, and I have written file 
upto 59MB. Now, my worker failed, the file which I was writing to will be never 
get rotated to final location.
2) As per 969 Jira, they have added the forceSync way but they are keeping all 
tuples in memory and delaying the acks sent back to spout. In our case, to 
write 64 MB data we need to store 5,400,000 process messages, which leads to 
lot of data in memory. This may lead to unnecessary reply of tuple from 
spout(I am aware its at least once and I can increase 
TOPOLOGY_MESSAGE_TIMEOUT_SECS to full fill my requirement but is there other 
way?)

Thanks,
Sachin



How to rotate file incase of storm worker fails?

2015-09-10 Thread Sachin Pasalkar
Hi,

I was looking at code where HDFSBolt writes the file with tuple coming to it. I 
also had a look  at JIRA 
STORM-969, I have following 
question to it

1)Lets say I have setup fileRotation policy at 64 MB, and I have written file 
upto 59MB. Now, my worker failed, the file which I was writing to will be never 
get rotated to final location.
2) As per 969 Jira, they have added the forceSync way but they are keeping all 
tuples in memory and delaying the acks sent back to spout. In our case, to 
write 64 MB data we need to store 5,400,000 process messages, which leads to 
lot of data in memory. This may lead to unnecessary reply of tuple from 
spout(I am aware its at least once and I can increase 
TOPOLOGY_MESSAGE_TIMEOUT_SECS to full fill my requirement but is there other 
way?)

Thanks,
Sachin


[jira] [Updated] (STORM-1006) Storm is not garbage collecting the messages (causing memory hit)

2015-08-25 Thread Sachin Pasalkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Pasalkar updated STORM-1006:
---
Description: 
We are reading whole file in memory around 5 MB, which is send through Kafaka 
to Storm. In next bolt, we performs the operation on file and sends out tuple 
to next bolt. After profiling we found that file (bytes of file) does not get 
garbage collected. So after further investigation we found that  
backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
 CollectionTuple, ListObject) API gets the first object and use it for 
tracking :(. Can you confirm reason behind this? Is there any way we can send 
different unique id as first element in list or the unique id of tuple used as 
indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, 
so that it will parse the file and send out list of values.

If you below code CoordinatedBolt, Object id = tuple.getValue(0);” takes the 
1st element from tuple instead of taking id of tuple. This id is then saved 
to _tracked hashhMap(TimeCache). In our case the 0th element is files byte 
data. This gets stored in the _tracked map till tree of tuple doesn’t get 
complete. As we are processing huge data we run outofMemory issue.

Code:

public void execute(Tuple tuple) {

*Object id = tuple.getValue(0);*

TrackingInfo track;

TupleType type = getTupleType(tuple);

synchronized(_tracked) {

track = _tracked.get(id);

if(track==null) {

track = new TrackingInfo();

if(_idStreamSpec==null) track.receivedId = true;

_tracked.put(id, track);*

}

}



if(type==TupleType.ID) {

synchronized(_tracked) {

track.receivedId = true;

}

checkFinishId(tuple, type);

} else if(type==TupleType.COORD) {

int count = (Integer) tuple.getValue(1);

synchronized(_tracked) {

track.reportCount++;

track.expectedTupleCount+=count;

}

checkFinishId(tuple, type);

} else {

synchronized(_tracked) {

_delegate.execute(tuple);

}

}

}




  was:
We are reading whole file in memory around 5 MB, which is send through Kafaka 
to Storm. In next bolt, we performs the operation on file and sends out tuple 
to next bolt. After profiling we found that file (bytes of file) does not get 
garbage collected. So after further investigation we found that  
backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
 CollectionTuple, ListObject) API gets the first object and use it for 
tracking :(. Can you confirm reason behind this? Is there any way we can send 
different unique id as first element in list or the unique id of tuple used as 
indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, 
so that it will parse the file and send out list of values.

If you below code CoordinatedBolt, Object id = tuple.getValue(0);” takes the 
1st element from tuple instead of taking id of tuple. This id is then saved 
to _tracked hashhMap(TimeCache). In our case the 0th element is files byte 
data. This gets stored in the _tracked map till tree of tuple doesn’t get 
complete. As we are processing huge data we run outofMemory issue.

Code:

public void execute(Tuple tuple) {

Object id = tuple.getValue(0);

TrackingInfo track;

TupleType type = getTupleType(tuple);

synchronized(_tracked) {

track = _tracked.get(id);

if(track==null) {

track = new TrackingInfo();

if(_idStreamSpec==null) track.receivedId = true;

_tracked.put(id, track);

}

}



if(type==TupleType.ID) {

synchronized(_tracked) {

track.receivedId = true;

}

checkFinishId(tuple, type);

} else if(type==TupleType.COORD) {

int count = (Integer) tuple.getValue(1);

synchronized(_tracked) {

track.reportCount++;

track.expectedTupleCount+=count;

}

checkFinishId(tuple, type);

} else {

synchronized(_tracked) {

_delegate.execute(tuple);

}

}

}





 Storm is not garbage collecting the messages (causing memory hit)
 -

 Key: STORM-1006
 URL: https://issues.apache.org/jira/browse/STORM-1006
 Project: Apache Storm
  Issue Type: Improvement
  Components: storm-kafka
Affects Versions: 0.9.3
Reporter: Sachin Pasalkar

 We

[jira] [Created] (STORM-1006) A big memory hit

2015-08-25 Thread Sachin Pasalkar (JIRA)
Sachin Pasalkar created STORM-1006:
--

 Summary: A big memory hit
 Key: STORM-1006
 URL: https://issues.apache.org/jira/browse/STORM-1006
 Project: Apache Storm
  Issue Type: Improvement
  Components: storm-kafka
Affects Versions: 0.9.3
 Environment: Mac
Reporter: Sachin Pasalkar


We are reading whole file in memory around 5 MB, which is send through Kafaka 
to Storm. In next bolt, we performs the operation on file and sends out tuple 
to next bolt. After profiling we found that file (bytes of file) does not get 
garbage collected. So after further investigation we found that  
backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
 CollectionTuple, ListObject) API gets the first object and use it for 
tracking :(. Can you confirm reason behind this? Is there any way we can send 
different unique id as first element in list or the unique id of tuple used as 
indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, 
so that it will parse the file and send out list of values.

If you below code CoordinatedBolt, Object id = tuple.getValue(0);” takes the 
1st element from tuple instead of taking id of tuple. This id is then saved 
to _tracked hashhMap(TimeCache). In our case the 0th element is files byte 
data. This gets stored in the _tracked map till tree of tuple doesn’t get 
complete. As we are processing huge data we run outofMemory issue.

Code:

public void execute(Tuple tuple) {

Object id = tuple.getValue(0);

TrackingInfo track;

TupleType type = getTupleType(tuple);

synchronized(_tracked) {

track = _tracked.get(id);

if(track==null) {

track = new TrackingInfo();

if(_idStreamSpec==null) track.receivedId = true;

_tracked.put(id, track);

}

}



if(type==TupleType.ID) {

synchronized(_tracked) {

track.receivedId = true;

}

checkFinishId(tuple, type);

} else if(type==TupleType.COORD) {

int count = (Integer) tuple.getValue(1);

synchronized(_tracked) {

track.reportCount++;

track.expectedTupleCount+=count;

}

checkFinishId(tuple, type);

} else {

synchronized(_tracked) {

_delegate.execute(tuple);

}

}

}






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Getting a big memory hit

2015-08-24 Thread Sachin Pasalkar
I don’t have a heap dump. Yes but I can point out to code where we see them 
being cached.

The below code is from void 
backtypeeclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.stormeclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordinationeclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination.CoordinatedBolteclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination(CoordinatedBolt.class%E2%98%83CoordinatedBolt
 class. If you see Object id = tuple.getValue(0);” takes the 1st element from 
tuple instead of taking id of tuple. This id is then saved to _tracked 
hashhMap. This hashMap is timeCache. In our case the 0th element is files byte 
data. This gets stored in the _tracked map till tree of tuple doesn’t get 
complete. As we are processing huge data we run outofMemory issue.


 public void execute(Tuple tuple) {

Object id = tuple.getValue(0);

TrackingInfo track;

TupleType type = getTupleType(tuple);

synchronized(_tracked) {

track = _tracked.get(id);

if(track==null) {

track = new TrackingInfo();

if(_idStreamSpec==null) track.receivedId = true;

_tracked.put(id, track);

}

}



if(type==TupleType.ID) {

synchronized(_tracked) {

track.receivedId = true;

}

checkFinishId(tuple, type);

} else if(type==TupleType.COORD) {

int count = (Integer) tuple.getValue(1);

synchronized(_tracked) {

track.reportCount++;

track.expectedTupleCount+=count;

}

checkFinishId(tuple, type);

} else {

synchronized(_tracked) {

_delegate.execute(tuple);

}

}

}


Let me know if you want more information from me :)


Thanks,

Sachin


From: Bobby Evans 
ev...@yahoo-inc.com.INVALIDmailto:ev...@yahoo-inc.com.INVALID
Reply-To: dev@storm.apache.orgmailto:dev@storm.apache.org 
dev@storm.apache.orgmailto:dev@storm.apache.org, Bobby Evans 
ev...@yahoo-inc.commailto:ev...@yahoo-inc.com
Date: Monday, 24 August 2015 6:50 pm
To: dev@storm.apache.orgmailto:dev@storm.apache.org 
dev@storm.apache.orgmailto:dev@storm.apache.org
Subject: Re: Getting a big memory hit

Do you have a heap dump or something that shows exactly which data structure 
those Tuples are being cached in?  In most cases the tuples should just have a 
tuple id extracted from it so it can be sent to the acker.  Once it is 
extracted GC should happen.

- Bobby


 On Saturday, August 22, 2015 11:16 AM, Sachin Pasalkar 
sachin_pasal...@symantec.commailto:sachin_pasal...@symantec.com wrote:


Hi,

We are reading whole file in memory around 5 MB, which is send through Kafaka 
to Storm. In next bolt, we have a bolt which performs the operation on file and 
sends out tuple to next bolt. After profiling we found that file (bytes of 
file) does not get garbage collected. So after further investigation we found 
that  
backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
 CollectionTuple, ListObject) API gets the first object and use it for 
tracking :(. Can you confirm reason behind this? Is there any way we can send 
different unique id as first element in list or the unique id of tuple used as 
indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, 
so that it will parse the file and send out list of values. Can you also 
explain why the list approach is used instead of map as we do declare the out 
fiels in getOutputFields() API

Thanks,
Sachin




RE: Getting a big memory hit

2015-08-24 Thread Sachin Pasalkar
Can you raise with concern team then? If you ask me if it's done in wrong way, 
let's get it fixed.

Thanks,
sachin

-Original Message-
From: Bobby Evans [mailto:ev...@yahoo-inc.com.INVALID] 
Sent: Monday, August 24, 2015 9:40 PM
To: dev@storm.apache.org
Subject: Re: Getting a big memory hit

This is part of the reason why Trident is really preferred over the Batch API.  
The first item in the tuple is intended to be a tracking id.  In trident they 
are able to hide all of this from you using the TridentTuple, but in the 
BatchAPI it does not seem to be hidden as well.  I am honestly not as familiar 
with the Batch API as I am with trident, I don't know how much more help I can 
be beyond that at this point.
 - Bobby 


 On Monday, August 24, 2015 9:54 AM, Sachin Pasalkar 
sachin_pasal...@symantec.com wrote:
   

 I don’t have a heap dump. Yes but I can point out to code where we see them 
being cached.

The below code is from void 
backtypeeclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.stormeclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordinationeclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination.CoordinatedBolteclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination(CoordinatedBolt.class%E2%98%83CoordinatedBolt
 class. If you see Object id = tuple.getValue(0);” takes the 1st element from 
tuple instead of taking id of tuple. This id is then saved to _tracked 
hashhMap. This hashMap is timeCache. In our case the 0th element is files byte 
data. This gets stored in the _tracked map till tree of tuple doesn’t get 
complete. As we are processing huge data we run outofMemory issue.


 public void execute(Tuple tuple) {

        Object id = tuple.getValue(0);

        TrackingInfo track;

        TupleType type = getTupleType(tuple);

        synchronized(_tracked) {

            track = _tracked.get(id);

            if(track==null) {

                track = new TrackingInfo();

                if(_idStreamSpec==null) track.receivedId = true;

                _tracked.put(id, track);

            }

        }



        if(type==TupleType.ID) {

            synchronized(_tracked) {

                track.receivedId = true;

            }

            checkFinishId(tuple, type);

        } else if(type==TupleType.COORD) {

            int count = (Integer) tuple.getValue(1);

            synchronized(_tracked) {

                track.reportCount++;

                track.expectedTupleCount+=count;

            }

            checkFinishId(tuple, type);

        } else {

            synchronized(_tracked) {

                _delegate.execute(tuple);

            }

        }

    }


Let me know if you want more information from me :)


Thanks,

Sachin


From: Bobby Evans 
ev...@yahoo-inc.com.INVALIDmailto:ev...@yahoo-inc.com.INVALID
Reply-To: dev@storm.apache.orgmailto:dev@storm.apache.org 
dev@storm.apache.orgmailto:dev@storm.apache.org, Bobby Evans 
ev...@yahoo-inc.commailto:ev...@yahoo-inc.com
Date: Monday, 24 August 2015 6:50 pm
To: dev@storm.apache.orgmailto:dev@storm.apache.org 
dev@storm.apache.orgmailto:dev@storm.apache.org
Subject: Re: Getting a big memory hit

Do you have a heap dump or something that shows exactly which data structure 
those Tuples are being cached in?  In most cases the tuples should just have a 
tuple id extracted from it so it can be sent to the acker.  Once it is 
extracted GC should happen.

- Bobby


    On Saturday, August 22, 2015 11:16 AM, Sachin Pasalkar 
sachin_pasal...@symantec.commailto:sachin_pasal...@symantec.com wrote:


Hi,

We are reading whole file in memory around 5 MB, which is send through Kafaka 
to Storm. In next bolt, we have a bolt which performs the operation on file and 
sends out tuple to next bolt. After profiling we found that file (bytes of 
file) does not get garbage collected. So after further investigation we found 
that  
backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String,
 CollectionTuple, ListObject) API gets the first object and use it for 
tracking :(. Can you confirm reason behind this? Is there any way we can send 
different unique id as first element in list or the unique id of tuple used as 
indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, 
so that it will parse the file and send out list of values. Can you also 
explain why the list approach is used instead of map as we do declare the out 
fiels