Hey TJ, A state store's change log is treated separately from a bootstrap stream. Bootstrap stream messages are seen by the StreamTask.process method, while the state store's change log messages are never seen by the process method. As such, the bootstrap=true behavior has no affect on the change log.
That said, I do think that there are use cases where you would want a bootstrap stream to read from the last check pointed offset, and not always default it to the "oldest" offset. For example, if you bootstrap a stream and write the messages INTO a store in the StreamTask.process() method, you can safely skip bootstrapping from the "oldest" offset when your container restarts later, since your state (which has the writes from the upstream bootstrap stream) will ALREADY be restored. In this case, you could safely just pick up where you left off. Cheers, Chris On 4/11/14 12:04 PM, "TJ Giuli" <[email protected]> wrote: >Gotcha. Regarding whether bootstrap=true should be default read from the >oldest possible message on a topic, I would say it depends to some degree >on what the behavior is in relation to the reliable state store (e.g. >KeyValueStore). Is a stream’s state store topic automatically considered >a bootstrap stream without explicitly setting it as such in the config >file? If I use the reliable state store and I’m guaranteed that 1.) the >state store will fully bootstrap before any other stream and 2.) >afterward any other stream I’ve marked as a bootstrap will fully consume >any outstanding messages since the last checkpoint, this seems reasonable >to me. > >Can you go into some detail about how these two areas of the config >interact? Thanks! >—T > >On Apr 11, 2014, at 10:30 AM, Chris Riccomini <[email protected]> >wrote: > >> Hey TJ, >> >> Correct, you've got it. >> >> * `reset.offset` tells Samza to ignore the check pointed offset >> * `offset.default` tells Samza what to do when there's no offset check >> pointed (because you've ignored it) >> * `bootstrap=true` tells Samza to prioritize the stream above all >> non-bootstrap streams, and to forcibly read all of its messages until >>the >> stream is caught up before allowing any other messages to be processed. >> >> Reflecting on this now. It seems a bit complicated. We could have Samza >> set these automatically when bootstrap=true is set. >> >> Cheers, >> Chris >> >> On 4/11/14 10:11 AM, "TJ Giuli" <[email protected]> wrote: >> >>> Hey, Chris, thanks for the help. I’ve done a bit more playing around >>>and >>> I’ve found that with my experimental setup I have to set *both* >>> offset.default=oldest and reset.offset=true to consistently bootstrap >>>the >>> stream from the beginning. Here are the config lines of importance: >>> >>> systems.kafka.streams.wikipedia-raw.samza.bootstrap=true >>> systems.kafka.streams.wikipedia-raw.samza.reset.offset=true >>> systems.kafka.streams.wikipedia-raw.samza.offset.default=oldest >>> >>> And here are the log outputs >>> >>> Only offset.default=oldest: http://pastebin.com/yxnw2Ypu >>> This behaves as expected — my task had checkpointed and I was not >>>filling >>> the wikipedia-raw topic with anything else, so the task did not read in >>> any messages. >>> >>> Only reset.offset=true: http://pastebin.com/sw50RiYz >>> 2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using default offset >>>UPCOMING >>> for SystemStream [system=kafka, stream=wikipedia-raw]. >>> 2014-04-11 09:55:14 OffsetManager$ [DEBUG] Using reset offset true for >>> SystemStream [system=kafka, stream=wikipedia-raw]. >>> >>> In this case, just setting reset.offset=true is not enough — the stream >>> still starts from UPCOMING and thus does not read any messages from >>> before the checkpoint. >>> >>> Both: http://pastebin.com/WF2MQZcN >>> 2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using default offset OLDEST >>> for SystemStream [system=kafka, stream=wikipedia-raw]. >>> 2014-04-11 09:57:45 OffsetManager$ [DEBUG] Using reset offset true for >>> SystemStream [system=kafka, stream=wikipedia-raw]. >>> >>> This case both resets the offset back to 0 and consumes messages from >>>the >>> 0th message in wikipedia-raw. Is this the expected behavior? Again, >>> thanks for the help! >>> —T >>> >>> >>> >>> >>> On Apr 10, 2014, at 2:01 PM, Chris Riccomini <[email protected]> >>> wrote: >>> >>>> Hey TJ, >>>> >>>> Samza has a setting called systems.%s.streams.%s.samza.offset.default. >>>> If >>>> undefined, it uses "upcoming", which means start from the most recent >>>> message in the topic. For a bootstrap stream, you probably want to set >>>> this to "oldest". >>>> >>>> systems.kafka.wikipedia-raw.samza.offset.default=oldest >>>> >>>> This will tell Samza to start from offset 0, not offset 42, in the >>>>case >>>> where there is no checkpoint available (i.e. When you start your job >>>>for >>>> the first time). Once there is a checkpoint (e.g. You've run your job >>>> for >>>> a period of time, and a checkpoint manager is configured), Samza will >>>> use >>>> the checkpoint. For example, if you've read to offset 55, and then >>>> restart, the SamzaContainer will begin bootstrapping from offset 55 >>>>(not >>>> offset 0). To force the bootstrapping to ALWAYS begin from offset 0, >>>>use >>>> this setting: >>>> >>>> systems.kafka.wikipedia-raw.samza.reset.offset=true >>>> >>>> >>>> This will tell Samza to disregard the check pointed offset for the >>>> stream >>>> when it starts up. By forcing the offset to oldest, and disregarding >>>> offsets, you'll always re-process the entire bootstrap stream from >>>> offset >>>> 0. This is likely what you want. >>>> >>>> This is undocumented right now. Sorry about that. >>>> >>>> Cheers, >>>> Chris >>>> >>>> On 4/10/14 11:19 AM, "TJ Giuli" <[email protected]> wrote: >>>> >>>>> Hi, I¹m having some trouble getting bootstrapping to work. To start >>>>> with >>>>> a simple example, I¹ve augmented hello-samza to get the Wikipedia >>>>>feed >>>>> parser to bootstrap from the wikipedia-raw topic. When I peek into >>>>>the >>>>> logs, I see the following lines: >>>>> >>>>> 2014-04-10 11:09:46 KafkaSystemAdmin$ [INFO] Got metadata: >>>>> Map(wikipedia-raw -> SystemStreamMetadata [streamName=wikipedia-raw, >>>>> partitionMetadata={Partition >>>>> [partition=0]=SystemStreamPartitionMetadata >>>>> [oldestOffset=0, newestOffset=41, upcomingOffset=42]}]) >>>>> >>>>> 2014-04-10 11:09:46 DefaultChooser [INFO] Building default chooser >>>>> with: >>>>> useBatching=false, useBootstrapping=true, usePriority=true >>>>> >>>>> 2014-04-10 11:09:46 BootstrappingChooser [INFO] Bootstrap stream is >>>>> fully >>>>> caught up: SystemStream [system=kafka, stream=wikipedia-raw] >>>>> >>>>> 2014-04-10 11:09:46 GetOffset [INFO] Validating offset 42 for topic >>>>>and >>>>> partition [wikipedia-raw,0] >>>>> 2014-04-10 11:09:46 GetOffset [INFO] Able to successfully read from >>>>> offset 42 for topic and partition [wikipedia-raw,0]. Using it to >>>>> instantiate consumer. >>>>> >>>>> So it seems that Samza recognizes that I want to bootstrap >>>>> wikipedia-raw, >>>>> but as the stream processor is finishing initialization, it seems to >>>>> think that wikipedia-raw has been fully bootstrapped and so it starts >>>>> from offset 42. Any ideas? My config is below, full logs on >>>>>pastebin >>>>> (http://pastebin.com/ZatAA0Ak) >>>>> >>>>> # Job >>>>> >>>>> >>>>> >>>>> >>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory >>>>> job.name=wikipedia-parser >>>>> >>>>> # Task >>>>> >>>>> >>>>> >>>>> >>>>> task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask >>>>> task.inputs=kafka.wikipedia-raw >>>>> >>>>> >>>>>task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpo >>>>>in >>>>> tM >>>>> anagerFactory >>>>> task.checkpoint.system=kafka >>>>> # Normally, this would be 3, but we have only one broker. >>>>> >>>>> >>>>> >>>>> >>>>> task.checkpoint.replication.factor=1 >>>>> >>>>> # Das boot >>>>> >>>>> >>>>> >>>>> >>>>> systems.kafka.streams.wikipedia-raw.samza.bootstrap=true >>>>> systems.kafka.streams.wikipedia-raw.samza.reset.offset=true >>>>> >>>>> # Metrics >>>>> >>>>> >>>>> >>>>> >>>>> metrics.reporters=snapshot,jmx >>>>> >>>>> >>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Metr >>>>>ic >>>>> sS >>>>> napshotReporterFactory >>>>> metrics.reporter.snapshot.stream=kafka.metrics >>>>> >>>>> >>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReport >>>>>er >>>>> Fa >>>>> ctory >>>>> >>>>> # Serializers >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>>serializers.registry.json.class=org.apache.samza.serializers.JsonSerde >>>>>Fa >>>>> ct >>>>> ory >>>>> >>>>> >>>>>serializers.registry.metrics.class=org.apache.samza.serializers.Metric >>>>>sS >>>>> na >>>>> pshotSerdeFactory >>>>> >>>>> # Systems >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemF >>>>>ac >>>>> to >>>>> ry >>>>> systems.kafka.samza.msg.serde=json >>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181/ >>>>> #systems.kafka.consumer.auto.offset.reset=smallest >>>>> >>>>> >>>>> >>>>> >>>>> systems.kafka.producer.metadata.broker.list=localhost:9092 >>>>> systems.kafka.producer.producer.type=sync >>>>> # Normally, we'd set this much higher, but we want things to look >>>>> snappy >>>>> in the demo. >>>>> >>>>> >>>>> >>>>> systems.kafka.producer.batch.num.messages=1 >>>>> systems.kafka.streams.metrics.samza.msg.serde=metrics >>>>> >>>>> >>>>> >>>>> Thanks! >>>>> ‹T >>>> >>> >> >
