Hi, I’m having some trouble getting bootstrapping to work.  To start with a 
simple example, I’ve augmented hello-samza to get the Wikipedia feed parser to 
bootstrap from the wikipedia-raw topic.  When I peek into the logs, I see the 
following lines:

2014-04-10 11:09:46 KafkaSystemAdmin$ [INFO] Got metadata: Map(wikipedia-raw -> 
SystemStreamMetadata [streamName=wikipedia-raw, partitionMetadata={Partition 
[partition=0]=SystemStreamPartitionMetadata [oldestOffset=0, newestOffset=41, 
upcomingOffset=42]}])

2014-04-10 11:09:46 DefaultChooser [INFO] Building default chooser with: 
useBatching=false, useBootstrapping=true, usePriority=true

2014-04-10 11:09:46 BootstrappingChooser [INFO] Bootstrap stream is fully 
caught up: SystemStream [system=kafka, stream=wikipedia-raw]

2014-04-10 11:09:46 GetOffset [INFO] Validating offset 42 for topic and 
partition [wikipedia-raw,0]
2014-04-10 11:09:46 GetOffset [INFO] Able to successfully read from offset 42 
for topic and partition [wikipedia-raw,0]. Using it to instantiate consumer.

So it seems that Samza recognizes that I want to bootstrap wikipedia-raw, but 
as the stream processor is finishing initialization, it seems to think that 
wikipedia-raw has been fully bootstrapped and so it starts from offset 42.  Any 
ideas?  My config is below, full logs on pastebin (http://pastebin.com/ZatAA0Ak)

# Job                                                                           
                                                                                
                                                                                
                                                                                
                                           
job.factory.class=org.apache.samza.job.local.LocalJobFactory
job.name=wikipedia-parser

# Task                                                                          
                                                                                
                                                                                
                                                                                
                                           
task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
task.inputs=kafka.wikipedia-raw
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.                       
                                                                                
                                                                                
                                                                                
                                           
task.checkpoint.replication.factor=1

# Das boot                                                                      
                                                                                
                                                                                
                                                                                
                                           
systems.kafka.streams.wikipedia-raw.samza.bootstrap=true
systems.kafka.streams.wikipedia-raw.samza.reset.offset=true

# Metrics                                                                       
                                                                                
                                                                                
                                                                                
                                           
metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory

# Serializers                                                                   
                                                                                
                                                                                
                                                                                
                                           
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory

# Systems                                                                       
                                                                                
                                                                                
                                                                                
                                           
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=localhost:2181/
#systems.kafka.consumer.auto.offset.reset=smallest                              
                                                                                
                                                                                
                                                                                
                                           
systems.kafka.producer.metadata.broker.list=localhost:9092
systems.kafka.producer.producer.type=sync
# Normally, we'd set this much higher, but we want things to look snappy in the 
demo.                                                                           
                                                                                
                                                                                
                                           
systems.kafka.producer.batch.num.messages=1
systems.kafka.streams.metrics.samza.msg.serde=metrics



Thanks!
—T

Reply via email to