Hi, I’m having some trouble getting bootstrapping to work. To start with a simple example, I’ve augmented hello-samza to get the Wikipedia feed parser to bootstrap from the wikipedia-raw topic. When I peek into the logs, I see the following lines:
2014-04-10 11:09:46 KafkaSystemAdmin$ [INFO] Got metadata: Map(wikipedia-raw ->
SystemStreamMetadata [streamName=wikipedia-raw, partitionMetadata={Partition
[partition=0]=SystemStreamPartitionMetadata [oldestOffset=0, newestOffset=41,
upcomingOffset=42]}])
2014-04-10 11:09:46 DefaultChooser [INFO] Building default chooser with:
useBatching=false, useBootstrapping=true, usePriority=true
2014-04-10 11:09:46 BootstrappingChooser [INFO] Bootstrap stream is fully
caught up: SystemStream [system=kafka, stream=wikipedia-raw]
2014-04-10 11:09:46 GetOffset [INFO] Validating offset 42 for topic and
partition [wikipedia-raw,0]
2014-04-10 11:09:46 GetOffset [INFO] Able to successfully read from offset 42
for topic and partition [wikipedia-raw,0]. Using it to instantiate consumer.
So it seems that Samza recognizes that I want to bootstrap wikipedia-raw, but
as the stream processor is finishing initialization, it seems to think that
wikipedia-raw has been fully bootstrapped and so it starts from offset 42. Any
ideas? My config is below, full logs on pastebin (http://pastebin.com/ZatAA0Ak)
# Job
job.factory.class=org.apache.samza.job.local.LocalJobFactory
job.name=wikipedia-parser
# Task
task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
task.inputs=kafka.wikipedia-raw
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1
# Das boot
systems.kafka.streams.wikipedia-raw.samza.bootstrap=true
systems.kafka.streams.wikipedia-raw.samza.reset.offset=true
# Metrics
metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=localhost:2181/
#systems.kafka.consumer.auto.offset.reset=smallest
systems.kafka.producer.metadata.broker.list=localhost:9092
systems.kafka.producer.producer.type=sync
# Normally, we'd set this much higher, but we want things to look snappy in the
demo.
systems.kafka.producer.batch.num.messages=1
systems.kafka.streams.metrics.samza.msg.serde=metrics
Thanks!
—T
