Hey Garry, I believe this is a similar issue to SAMZA-142.
Can you try adding a config to set auto.offset.reset to smallest? Something like: systems.kafka.streams.positive-words.consumer.auto.offset.reset=smallest systems.kafka.streams.negative-words.consumer.auto.offset.reset=smallest This should change this log line: Final offset to be returned for Topic and Partition [positive-words,0] = 2006 To something like: Final offset to be returned for Topic and Partition [positive-words,0] = 0 Cheers, Chris On 2/12/14 10:44 AM, "Garry Turkington" <g.turking...@improvedigital.com> wrote: >Hi Chris, > >Sorry for the wrong log file! > >Samza container log is at: >http://pastebin.com/D5bAJd7U > >I do notice that it mentions returning the highest offset for the >supposedly bootstrapped streams which I presume shouldn't be happening. > >Thanks, >Garry > >-----Original Message----- >From: Chris Riccomini [mailto:criccom...@linkedin.com] >Sent: 12 February 2014 17:42 >To: dev@samza.incubator.apache.org >Subject: Re: Using bootstrap streams > >Hey Garry, > >So far, everything looks normal. > >The container you log you sent me actually appears to be the output of >the run-job.sh command. Since you're using YarnJobFactory, this is not >actually the container log. Could you grab the log from the container >that's running in YARN, and stick that in pastebin? You can usually find >this by going to YARN's RM (http://localhost:8088) and finding the link >to your ApplicationMaster. This will link to the logs for each container >that's running your tasks. > >Cheers, >Chris > >On 2/11/14 2:11 PM, "Garry Turkington" <g.turking...@improvedigital.com> >wrote: > >>Hi Chris, >> >>Following up on this, sorry for the delay, travelling this week. >> >>Main task config: >>http://pastebin.com/enQXLcbZ >> >>Container log: >>http://pastebin.com/YLiKp0CS >> >>I'm putting the positive and negative words into the bootstrap streams >>prior to running the job -- and confirmed the data is in the Kafka >>stream via kafka-console-consumer.sh with the --from-beginning option. >> >>Thanks for any input! >>Garry >> >>-----Original Message----- >>From: Chris Riccomini [mailto:criccom...@linkedin.com] >>Sent: 10 February 2014 23:25 >>To: dev@samza.incubator.apache.org >>Subject: Re: Using bootstrap streams >> >>Hey Garry, >> >>It sounds like your understanding of bootstrap streams is correct. >> >>Bootstrap stream messages will be delivered to the process() method >>just like any other. The only difference is you're supposed to get all >>of them from 0-lastOffset before you get any messages from non-bootstrap >>streams. >>Your positive/negative example sounds like a reasonable use case for a >>bootstrap stream. >> >>A few questions: >> >>1. Can you post the container logs and the full configuration file for >>your job somewhere (e.g. Github gist)? >>2. Are you putting data into the positive-words and negative-words >>topic before you start the Samza job? >> >>Also, you can do envelope.getSystemStreamPartition().getStream() >>directly (no need to call getSystemStream()). >> >>Cheers, >>Chris >> >>On 2/10/14 3:18 AM, "Garry Turkington" >><g.turking...@improvedigital.com> >>wrote: >> >>>Hi, >>> >>>I was building a task to do some sentiment analysis on incoming data. >>>I have a corpus each of positive and negative words to which the task >>>needs access. This seemed like a good fit for bootstrap streams. But I >>>can't seem to get them to work. >>> >>>I have my job configured with the 3 Kafka topics in task.inputs and >>>that seems to work, just throwing data at any of the topics is hitting >>>the task. >>> >>>But setting up the 2 reference streams as bootstrap doesn't seem to be >>>working. Here's the relevant part of the config, I want to read the >>>entire message history each time: >>> >>>systems.kafka.streams.positive-words.samza.bootstrap=true >>>systems.kafka.streams.positive-words.samza.reset.offset=true >>> >>>systems.kafka.streams.negative-words.samza.bootstrap=true >>>systems.kafka.streams.negative-words.samza.reset.offset=true >>> >>>Do bootstrap streams get handled in any special way, I'm assuming here >>>that the messages will arrive in the process method on StreamTask just >>>like any other and I can handle them differently by switching on >>>envelope.getSystemStreamPartition().getSystemStream().getStream(). >>>Looking at the code it looks the same with the BootstrapChooser doing >>>its magic to determine which message is delivered to the task but the >>>actual delivery seems the same. >>> >>>What am I missing? >>> >>>Thanks, >>>Garry >>> >> >> >>----- >>No virus found in this message. >>Checked by AVG - www.avg.com >>Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: >>02/10/14 > > >----- >No virus found in this message. >Checked by AVG - www.avg.com >Version: 2014.0.4259 / Virus Database: 3697/7081 - Release Date: 02/10/14