Hi, Yi,

I couldn't find any errors in the log indicating any issue writing to those
particular changelog partitions. I event went ahead and removed all
checkpoint, coordinator and changelog topics and started fresh. This issue
is still manifested itself with offsets of 0:

partition     offset
----------------------
0 0
1 54251
2 54315
3 54196
4 53548
5 53581
6 55175
7 54599
8 53694
9 0
10 53456
11 53450
12 0
13 54442
14 54759
15 54958
16 54909
17 53396
18 55442
19 54121

In this case, we have partition 0, 2, 8 and 14 all running in the same YARN
container, which means that it's not a container specific issue (since
partition 2, 8 and 14 all get proper changelogs written).

As I mentioned earlier, the changelog topic was auto-created by the samza
job. So no manual overrides such as "auto-commit" was given.

Thanks,
David

On Mon, Jun 13, 2016 at 9:40 AM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, David,
>
> Did you check the log to see whether there is any log lines indicating the
> producer issues on the three partitions that you suspect? And could you
> also check whether you have auto-commit turned on? If your auto-commit is
> on and producer does not report any issue writing to the changelog topic,
> you may want to do a comparison between the local RocksDB and the one
> persisted in changelog to verify that there indeed are some discrepancies
> between them. Samza provides a command line tool state-storage-tool.sh to
> recover the RocksDB state store from the changelog. You can use it to
> recover the state store from changelog and compare w/ the local RocksDB to
> verify if there is any discrepancies.
>
> Best.
>
> -Yi
>
> On Sun, Jun 12, 2016 at 12:49 PM, David Yu <david...@optimizely.com>
> wrote:
>
> > Jagadish,
> >
> > All your description matches my understand.
> >
> > Here are our settings:
> > - Our task aggregates user events into user sessions.
> > - We have one k-v store for each task, which tracks active user sessions
> > (with sessionId as the key).
> > - When a user session expires, the session will be removed from the
> store.
> > - The changelog topic was auto created with cleanup.policy=*compact*.
> >
> > In terms of log compaction, I'm expecting it to keep the last log entry
> for
> > a given key and deletes all previous entries. For example, if we have:
> >
> > store.put("session1", Session1_1)  // session created
> > store.put("session1", Session1_2)  // session updated
> > store.delete("session1")                  // session expired
> >
> >
> > I'm expecting something as following in the changelog (after compaction):
> >
> > 1 session1=Session1_1
> > 2 session1=Session1_2
> > 3 session1=NULL
> >
> >
> > with only offset 3 retained. The next log entry should take offset 4. In
> > that sense, the offsets should always increase monotonically, with lots
> of
> > gaps in between due to compaction.
> >
> > So again, I'm not sure why we have three changelog partitions that stop
> > seeing movements in their offsets.
> >
> > Thanks,
> > David
> >
> > On Sun, Jun 12, 2016 at 11:09 AM, Jagadish Venkatraman <
> > jagadish1...@gmail.com> wrote:
> >
> > > Some context: Each k-v store has a changelog topic. The # of partitions
> > in
> > > that changelog topic is equal to the # of tasks. Each task's K-V store
> > will
> > > be mapped to a particular partition of that changelog topic. This
> mapping
> > > from taskNames-changeLogPartitionNumber is stored in coordinator
> stream.
> >
> >
> > > Of course, you don't want this k-v changelog topic to keep growing. So,
> > > people configure it with some expiration. The expiration can either be:
> > > 1. Time retention: Records older than the retention are purged.
> > > 2. Compaction: Newer key-values will over-write older keys and only the
> > > most recent value is retained.
> > >
> > > I'm not sure if offsets are always monotonically increasing in Kafka or
> > > could change after a compaction/ a time based retention kicks in for
> the
> > > topic partition.
> > >
> > >
> > >
> > >
> > >
> > > On Sat, Jun 11, 2016 at 11:53 PM, David Yu <david...@optimizely.com>
> > > wrote:
> > >
> > > > My understanding of store changelog is that, each task writes store
> > > changes
> > > > to a particular changelog partition for that task. (Does that mean
> the
> > > > changelog keys are task names?)
> > > >
> > > > One thing that confuses me is that, the last offsets of some
> changelog
> > > > partitions do not move. I'm using the kafka GetOffsetShell tool to
> get
> > > the
> > > > last offsets for each partition. The result looks like this:
> > > >
> > > > partition   offset
> > > > 0 7090
> > > > 1 3737937
> > > > 2 3733222
> > > > 3 3719065
> > > > 4 3730208
> > > > 5 3731128
> > > > 6 3734669
> > > > 7 3691461
> > > > 8 3759133
> > > > 9 7286
> > > > 10 3690347
> > > > 11 3722450
> > > > 12 7376
> > > > 13 3738454
> > > > 14 3742316
> > > > 15 3710512
> > > > 16 3777267
> > > > 17 3750596
> > > > 18 3728185
> > > > 19 3694470
> > > >
> > > > As you can see, three of the partitions barely got any updates. In
> > fact,
> > > > the offsets stopped moving for a while. The traffic for each task
> > should
> > > be
> > > > fairly balanced. I checked the task log and made sure that the stores
> > for
> > > > these partitions are actively updated.
> > > >
> > > > Any idea why this is happening? Or am I missing something?
> > > >
> > > > Thanks,
> > > > David
> > > >
> > >
> > >
> > >
> > > --
> > > Jagadish V,
> > > Graduate Student,
> > > Department of Computer Science,
> > > Stanford University
> > >
> >
>

Reply via email to