Chris -

Here are the logs:

2015-01-16 17:30:15 KafkaCheckpointManager [INFO] Got checkpoint state for
taskName Partition 0: Checkpoint [offsets={SystemStreamPartition [kafka,
argos-raw, 0]=60958}]

2015-01-16 17:30:15 OffsetManager [INFO] Successfully loaded last processed
offsets: Map(SystemStreamPartition [kafka, argos-raw, 0] -> 60958)

2015-01-16 17:30:15 OffsetManager [INFO] Successfully loaded starting
offsets: Map(SystemStreamPartition [kafka, argos-raw, 0] -> 60959)

2015-01-16 17:30:15 SamzaContainer [INFO] Starting task instance stores.

2015-01-16 17:30:15 SamzaContainer [INFO] Initializing stream tasks.

2015-01-16 17:30:15 SamzaContainer [INFO] Registering task instances with
producers.

2015-01-16 17:30:15 SamzaContainer [INFO] Starting producer multiplexer.

2015-01-16 17:30:15 SamzaContainer [INFO] Registering task instances with
consumers.

2015-01-16 17:30:15 SamzaContainer [INFO] Starting consumer multiplexer.

2015-01-16 17:30:15 VerifiableProperties [INFO] Verifying properties

2015-01-16 17:30:15 VerifiableProperties [INFO] Property client.id is
overridden to samza_consumer-argos_parser-1-1421458212216-3

2015-01-16 17:30:15 VerifiableProperties [INFO] Property
metadata.broker.list is overridden to localhost:9092

2015-01-16 17:30:15 VerifiableProperties [INFO] Property request.timeout.ms
is overridden to 30000

2015-01-16 17:30:15 ClientUtils$ [INFO] Fetching metadata from broker
id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s)
Set(argos-raw)

2015-01-16 17:30:15 SyncProducer [INFO] Connected to localhost:9092 for
producing

2015-01-16 17:30:15 SyncProducer [INFO] Disconnecting from localhost:9092

2015-01-16 17:30:15 BrokerProxy [INFO] Creating new SimpleConsumer for host
pppdc9prd2yv.corp.net:9092 for system kafka

2015-01-16 17:30:15 GetOffset [INFO] Validating offset 60959 for topic and
partition [argos-raw,0]

2015-01-16 17:30:15 GetOffset [INFO] Able to successfully read from offset
60959 for topic and partition [argos-raw,0]. Using it to instantiate
consumer.

2015-01-16 17:30:15 BrokerProxy [INFO] Starting BrokerProxy for
pppdc9prd2yv.corp.net:9092

2015-01-16 17:30:15 SamzaContainer [INFO] Entering run loop.

2015-01-16 17:30:15 KafkaSystemProducer [INFO] Creating a new producer for
system kafka.

2015-01-16 17:30:15 ClientUtils$ [INFO] Fetching metadata from broker
id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s)
Set(__samza_checkpoint_ver_1_for_argos-parser_1)

2015-01-16 17:30:15 SyncProducer [INFO] Connected to localhost:9092 for
producing

2015-01-16 17:30:15 SyncProducer [INFO] Disconnecting from localhost:9092

2015-01-16 17:30:15 SyncProducer [INFO] Connected to
pppdc9prd2yv.corp.net:9092 for producing

On Fri, Jan 16, 2015 at 6:03 PM, Chris Riccomini <
[email protected]> wrote:

> Hey Shekar,
>
> Hmm. Could you post your logs, so we can have a look? The logs will post
> both the config, and the offsets used. If there's an issue, we should be
> able to figure it out.
>
> Cheers,
> Chris
>
> On 1/16/15 5:41 PM, "Shekar Tippur" <[email protected]> wrote:
>
> >Interesting ..
> >
> >
> >I have added
> >
> >systems.system-name.streams.stream-name.samza.reset.offset = true
> >
> >systems.system-name.streams.stream-name.samza.offset.default = oldest
> >
> >to the config. I see that it is still looking only the latest events.
> >
> >
> >- Shekar
> >
> >On Fri, Jan 16, 2015 at 3:26 PM, Chinmay Soman <[email protected]
> >
> >wrote:
> >
> >> Aah that's right. Thanks for confirming.
> >>
> >> On Fri, Jan 16, 2015 at 3:22 PM, Chris Riccomini <
> >> [email protected]> wrote:
> >>
> >> > Hey Chinmay,
> >> >
> >> > We do have checkpoint-tool.sh, which allows you to set input offsets
> >>to
> >> > arbitrary locations. One caveat here is that your job has to be
> >>offline
> >> > when this is done. When coordinator stream is done, it should be
> >>possible
> >> > to dynamically change the offsets (by bouncing the containers) when
> >>the
> >> > offsets change.
> >> >
> >> > Cheers,
> >> > Chris
> >> >
> >> > On 1/16/15 2:13 PM, "Chinmay Soman" <[email protected]>
> wrote:
> >> >
> >> > >We don't have a way to start from a known offset right ?  Do you guys
> >> > >think
> >> > >that might be useful ?
> >> > >
> >> > >On Fri, Jan 16, 2015 at 11:55 AM, Shekar Tippur <[email protected]>
> >> > wrote:
> >> > >
> >> > >> Yan,
> >> > >> Thanks. This really helps.
> >> > >>
> >> > >> - Shekar
> >> > >>
> >> > >> On Fri, Jan 16, 2015 at 11:52 AM, Yan Fang <[email protected]>
> >> > wrote:
> >> > >>
> >> > >> > Hi Shekar,
> >> > >> >
> >> > >> > Assuming you are using 0.8, there are configurations that you can
> >> set
> >> > >>to
> >> > >> > read messages from oldest.
> >> > >> >
> >> > >> > systems.system-name.streams.stream-name.samza.reset.offset = true
> >> > >> > systems.system-name.streams.stream-name.samza.offset.default =
> >> oldest
> >> > >> >
> >> > >> > See
> >> > >> >
> >> > >> >
> >> > >>
> >> > >>
> >> >
> >>
> >>
> http://samza.incubator.apache.org/learn/documentation/0.8/jobs/configurat
> >> > >>ion-table.html
> >> > >> >
> >> > >> > Hope this helps.
> >> > >> >
> >> > >> > Thanks,
> >> > >> >
> >> > >> > Fang, Yan
> >> > >> > [email protected]
> >> > >> > +1 (206) 849-4108
> >> > >> >
> >> > >> > On Fri, Jan 16, 2015 at 11:02 AM, Shekar Tippur
> >><[email protected]>
> >> > >> wrote:
> >> > >> >
> >> > >> > > Hello,
> >> > >> > >
> >> > >> > > I was wondering what would be the recommended way to point
> >>Samza
> >> to
> >> > >> > consume
> >> > >> > > older messages?
> >> > >> > >
> >> > >> > > I have made some code changes that I want to apply to all the
> >> > >>messages
> >> > >> > that
> >> > >> > > have been read so far.
> >> > >> > >
> >> > >> > > - Shekar
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> > >
> >> > >--
> >> > >Thanks and regards
> >> > >
> >> > >Chinmay Soman
> >> >
> >> >
> >>
> >>
> >> --
> >> Thanks and regards
> >>
> >> Chinmay Soman
> >>
>
>

Reply via email to