Hi Chris, Woot! Yes, bootstrapped streams look like they are working properly with the SAMZA-145 patch. Thanks for the quick fix, appreciated.
Setting the default offset to smallest at the system level is a bit of a kludge for me, I'll indeed need set up separate system definitions to have different defaults for the bootstrapped vs non-bootstrapped streams. So SAMZA-144 is of interest to me, I'll try and look at that over the weekend. Thanks again! Garry -----Original Message----- From: Chris Riccomini [mailto:[email protected]] Sent: 13 February 2014 05:13 To: [email protected] Subject: Re: Using bootstrap streams Hey Garry, SAMZA-145 is merged in. Want to give stuff a shot with the latest patch? Thanks for your patience. :) Cheers, Chris On 2/12/14 6:28 PM, "Chris Riccomini" <[email protected]> wrote: >Hey Garry, > >Yep, this is definitely a bug. I've opened: > > https://issues.apache.org/jira/browse/SAMZA-145 > > >I've also posted a patch, and will merge it in as soon as I get a +1 >(by tomorrow). > >Cheers, >Chris > >On 2/12/14 5:10 PM, "Garry Turkington" ><[email protected]> >wrote: > >>Hi Chris, >> >>OK, so that did have an affect. :) >> >>Adding the Kafka-level offset as 'smallest' I do indeed see messages >>from the positive and negative word streams arrive in my stream task. >>But they seem to be appearing interleaved with the other >>non-bootstrapped stream so I think I'm seeing each stream being read >>from the smallest offset but with no priority given to the >>bootstrapped streams. And I still see this in the container logs, am I >>correct in assuming this should be showing selection of the BootstrapChooser? >> >>DefaultChooser [INFO] Building default chooser with: >>useBatching=false, useBootstrapping=false, usePriority=false >> >>Thanks for the help with this, hugely appreciated! >> >>Garry >> >> >>-----Original Message----- >>From: Chris Riccomini [mailto:[email protected]] >>Sent: 12 February 2014 22:07 >>To: [email protected] >>Subject: Re: Using bootstrap streams >> >>Hey Garry, >> >>I've opened: >> >> https://issues.apache.org/jira/browse/SAMZA-144 >> >>To track the stream-level Kafka configuration override issue. >> >>Let me know if the "smallest" setting works for you. >> >>Cheers, >>Chris >> >>On 2/12/14 11:21 AM, "Chris Riccomini" <[email protected]> wrote: >> >>>Hey Garry, >>> >>>I just noticed that we don't actually support stream-level overrides >>>for Kafka configs. >>> >>>To tes this, you'll have to set the consumer settings at the system >>>level: >>> >>>systems.kafka.consumer.auto.offset.reset=smallest >>> >>> >>>Note that this will cause you to read all data from kafka.tweets the >>>first time you run your job, as well. I think this is probably what >>>you want, but if not, you'd have to define two systems: one for the >>>bootstrap streams, and one for the tweet stream, so that you could >>>configure the bootstrap system to have the "smallest" reset setting. >>> >>>Cheers, >>>Chris >>> >>>On 2/12/14 11:06 AM, "Chris Riccomini" <[email protected]> wrote: >>> >>>>Hey Garry, >>>> >>>>I believe this is a similar issue to SAMZA-142. >>>> >>>>Can you try adding a config to set auto.offset.reset to smallest? >>>>Something like: >>>> >>>> >>>>systems.kafka.streams.positive-words.consumer.auto.offset.reset=smal >>>>le >>>>st >>>> >>>>systems.kafka.streams.negative-words.consumer.auto.offset.reset=smal >>>>le >>>>st >>>> >>>> >>>>This should change this log line: >>>> >>>>Final offset to be returned for Topic and Partition >>>>[positive-words,0] = >>>>2006 >>>> >>>> >>>>To something like: >>>> >>>>Final offset to be returned for Topic and Partition >>>>[positive-words,0] = >>>>0 >>>> >>>> >>>>Cheers, >>>>Chris >>>> >>>>On 2/12/14 10:44 AM, "Garry Turkington" >>>><[email protected]> >>>>wrote: >>>> >>>>>Hi Chris, >>>>> >>>>>Sorry for the wrong log file! >>>>> >>>>>Samza container log is at: >>>>>http://pastebin.com/D5bAJd7U >>>>> >>>>>I do notice that it mentions returning the highest offset for the >>>>>supposedly bootstrapped streams which I presume shouldn't be >>>>>happening. >>>>> >>>>>Thanks, >>>>>Garry >>>>> >>>>>-----Original Message----- >>>>>From: Chris Riccomini [mailto:[email protected]] >>>>>Sent: 12 February 2014 17:42 >>>>>To: [email protected] >>>>>Subject: Re: Using bootstrap streams >>>>> >>>>>Hey Garry, >>>>> >>>>>So far, everything looks normal. >>>>> >>>>>The container you log you sent me actually appears to be the output >>>>>of the run-job.sh command. Since you're using YarnJobFactory, this >>>>>is not actually the container log. Could you grab the log from the >>>>>container that's running in YARN, and stick that in pastebin? You >>>>>can usually find this by going to YARN's RM (http://localhost:8088) >>>>>and finding the link to your ApplicationMaster. This will link to >>>>>the logs for each container that's running your tasks. >>>>> >>>>>Cheers, >>>>>Chris >>>>> >>>>>On 2/11/14 2:11 PM, "Garry Turkington" >>>>><[email protected]> >>>>>wrote: >>>>> >>>>>>Hi Chris, >>>>>> >>>>>>Following up on this, sorry for the delay, travelling this week. >>>>>> >>>>>>Main task config: >>>>>>http://pastebin.com/enQXLcbZ >>>>>> >>>>>>Container log: >>>>>>http://pastebin.com/YLiKp0CS >>>>>> >>>>>>I'm putting the positive and negative words into the bootstrap >>>>>>streams prior to running the job -- and confirmed the data is in >>>>>>the Kafka stream via kafka-console-consumer.sh with the >>>>>>--from-beginning option. >>>>>> >>>>>>Thanks for any input! >>>>>>Garry >>>>>> >>>>>>-----Original Message----- >>>>>>From: Chris Riccomini [mailto:[email protected]] >>>>>>Sent: 10 February 2014 23:25 >>>>>>To: [email protected] >>>>>>Subject: Re: Using bootstrap streams >>>>>> >>>>>>Hey Garry, >>>>>> >>>>>>It sounds like your understanding of bootstrap streams is correct. >>>>>> >>>>>>Bootstrap stream messages will be delivered to the process() >>>>>>method just like any other. The only difference is you're supposed >>>>>>to get all of them from 0-lastOffset before you get any messages >>>>>>from non-bootstrap streams. >>>>>>Your positive/negative example sounds like a reasonable use case >>>>>>for a bootstrap stream. >>>>>> >>>>>>A few questions: >>>>>> >>>>>>1. Can you post the container logs and the full configuration file >>>>>>for your job somewhere (e.g. Github gist)? >>>>>>2. Are you putting data into the positive-words and negative-words >>>>>>topic before you start the Samza job? >>>>>> >>>>>>Also, you can do envelope.getSystemStreamPartition().getStream() >>>>>>directly (no need to call getSystemStream()). >>>>>> >>>>>>Cheers, >>>>>>Chris >>>>>> >>>>>>On 2/10/14 3:18 AM, "Garry Turkington" >>>>>><[email protected]> >>>>>>wrote: >>>>>> >>>>>>>Hi, >>>>>>> >>>>>>>I was building a task to do some sentiment analysis on incoming >>>>>>>data. >>>>>>>I have a corpus each of positive and negative words to which the >>>>>>>task needs access. This seemed like a good fit for bootstrap >>>>>>>streams. But I can't seem to get them to work. >>>>>>> >>>>>>>I have my job configured with the 3 Kafka topics in task.inputs >>>>>>>and that seems to work, just throwing data at any of the topics >>>>>>>is hitting the task. >>>>>>> >>>>>>>But setting up the 2 reference streams as bootstrap doesn't seem >>>>>>>to be working. Here's the relevant part of the config, I want to >>>>>>>read the entire message history each time: >>>>>>> >>>>>>>systems.kafka.streams.positive-words.samza.bootstrap=true >>>>>>>systems.kafka.streams.positive-words.samza.reset.offset=true >>>>>>> >>>>>>>systems.kafka.streams.negative-words.samza.bootstrap=true >>>>>>>systems.kafka.streams.negative-words.samza.reset.offset=true >>>>>>> >>>>>>>Do bootstrap streams get handled in any special way, I'm assuming >>>>>>>here that the messages will arrive in the process method on >>>>>>>StreamTask just like any other and I can handle them differently >>>>>>>by switching on >>>>>>>envelope.getSystemStreamPartition().getSystemStream().getStream(). >>>>>>>Looking at the code it looks the same with the BootstrapChooser >>>>>>>doing its magic to determine which message is delivered to the >>>>>>>task but the actual delivery seems the same. >>>>>>> >>>>>>>What am I missing? >>>>>>> >>>>>>>Thanks, >>>>>>>Garry >>>>>>> >>>>>> >>>>>> >>>>>>----- >>>>>>No virus found in this message. >>>>>>Checked by AVG - www.avg.com >>>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: >>>>>>02/10/14 >>>>> >>>>> >>>>>----- >>>>>No virus found in this message. >>>>>Checked by AVG - www.avg.com >>>>>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: >>>>>02/10/14 >>>> >>> >> >> >>----- >>No virus found in this message. >>Checked by AVG - www.avg.com >>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: >>02/10/14 >
