> you never know if that's the same order in which the writers will commit
(when you have multiple writers in the system)

That's exactly the problem with trying to rely on timestamps. Even if you
can coordinate the timestamps themselves, commits could still be out of
order because of differences in how long it takes to write the final
metadata. I think the right approach for incremental consumption is to take
the current snapshot and traverse back through history until you find the
last snapshot that you processed. That way, you know that you have the full
history.

The transaction ID idea is more for committing data exactly-once or for
coordinating with other systems. You can put an external identifier in
snapshot metadata and use that to check whether a particular commit has
already succeeded. You can also use it to track freshness. We are building
materialized views that use the snapshot IDs of other tables to track how
fresh the materialized view is.

On Thu, Sep 10, 2020 at 3:03 PM Ashish Mehta <mehta.ashis...@gmail.com>
wrote:

> > This is better handled by attaching a global transaction-id (e.g. UUID
> that is monotonically increasing) to the snapshot metadata (iceberg allows
> adding this to the summary)
> I believe even if the client can provide metadata for a snapshot during
> commit operation, you never know if that's the same order in which the
> writers will commit (when you have multiple writers in the system).
>
> -Ashish
>
> On Thu, Sep 10, 2020 at 12:18 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> I should also add one more thing. The PR I linked to above is a good way
>> to introduce a clock, but it was pointed out in the sync that even if we
>> had a service that provided synchronized timestamps, there is no guarantee
>> that there isn't a race condition between committers getting timestamps and
>> then committing. So we would still have an out-of-order problem. It is best
>> not to rely on timestamps other than for inspecting tables to get a rough
>> idea of when a node committed.
>>
>> On Thu, Sep 10, 2020 at 12:14 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> Thanks, Gautam! I think that's a good summary of the discussion.
>>>
>>> On Thu, Sep 10, 2020 at 11:56 AM Gautam <gautamkows...@gmail.com> wrote:
>>>
>>>> Wanted to circle back on this thread. Linear timestamps was discussed
>>>> during the sync and the conclusion was that timestamp based incremental
>>>> reading is generally discouraged as that introduces correctness issues.
>>>> Even if a custom clock is available keeping timestamps atomic and
>>>> monotonically increasing is going to be a problem for applications.
>>>> Enforcing this in Iceberg (by blocking out-of-order timestamps) can allow
>>>> potential issues e.g. a client committing an erroneous timestamp, that is
>>>> way in the future, would block all other clients from committing.
>>>>
>>>> This is better handled by attaching a global transaction-id (e.g. UUID
>>>> that is monotonically increasing) to the snapshot metadata (iceberg allows
>>>> adding this to the summary). The incremental read application can then use
>>>> the transaction-id as a key to the exact from/to snapshot-id to do
>>>> incremental reading.
>>>>
>>>> Hope I covered the points raised.
>>>>
>>>> Regards,
>>>> -Gautam.
>>>>
>>>> On Wed, Sep 9, 2020 at 5:07 PM Ryan Blue <rb...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Hi everyone, I'm putting this on the agenda for today's Iceberg sync.
>>>>>
>>>>> Also, I want to point out John's recent PR that added a way to inject
>>>>> a Clock that is used for timestamp generation:
>>>>> https://github.com/apache/iceberg/pull/1389
>>>>>
>>>>> That fits nicely with the requirements here and would be an easy way
>>>>> to inject your own time, synchronized by an external service.
>>>>>
>>>>> On Wed, Sep 9, 2020 at 12:33 AM Peter Vary <pv...@cloudera.com.invalid>
>>>>> wrote:
>>>>>
>>>>>> Quick question below about the proposed usage of the timestamp:
>>>>>>
>>>>>> On Sep 9, 2020, at 7:24 AM, Miao Wang <miw...@adobe.com.INVALID>
>>>>>> wrote:
>>>>>>
>>>>>> +1 Openlnx’s comment on implementation.
>>>>>>
>>>>>> Only if we have an external timing synchronization service and
>>>>>> enforce all clients using the service, timestamps of different clients 
>>>>>> are
>>>>>> not comparable.
>>>>>>
>>>>>>
>>>>>> Do we want to use the timestamp as the real timestamp of the last
>>>>>> change, or we want to use it only as a monotonously increasing more human
>>>>>> readable identifier?
>>>>>> Do we want to compare this timestamp against some external source, or
>>>>>> we just want to compare this timestamp with other timestamps in the
>>>>>> different snapshots of the same table?
>>>>>>
>>>>>>
>>>>>> So, there are two asks: 1). Whether to have a timestamp based API for
>>>>>> delta reading; 2). How to enforce and implement a service/protocol for
>>>>>> timestamp sync among all clients.
>>>>>>
>>>>>> 1). +1 to have it as Jingsong and Gautam suggested. Snapshot ID could
>>>>>> be source of truth in any cases.
>>>>>>
>>>>>> 2). IMO, it should be an external package to Iceberg.
>>>>>>
>>>>>> Miao
>>>>>>
>>>>>> *From: *OpenInx <open...@gmail.com>
>>>>>> *Reply-To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org>
>>>>>> *Date: *Tuesday, September 8, 2020 at 7:55 PM
>>>>>> *To: *Iceberg Dev List <dev@iceberg.apache.org>
>>>>>> *Subject: *Re: Timestamp Based Incremental Reading in Iceberg ...
>>>>>>
>>>>>> I agree that  it's helpful to allow users to read the incremental
>>>>>> delta based timestamp,  as Jingsong said timestamp is more friendly.
>>>>>>
>>>>>> My question is how to implement this ?
>>>>>>
>>>>>>  If just attach the client's timestamp to the iceberg table when
>>>>>> committing,  then different clients may have different timestamp values
>>>>>> because of the skewing. In theory, these time values are not strictly
>>>>>> comparable, and can only be compared within the margin of error.
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 9, 2020 at 10:06 AM Jingsong Li <jingsongl...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> +1 for timestamps are linear, in implementation, maybe the writer
>>>>>> only needs to look at the previous snapshot timestamp.
>>>>>>
>>>>>> We're trying to think of iceberg as a message queue, Let's take the
>>>>>> popular queue Kafka as an example,
>>>>>> Iceberg has snapshotId and timestamp, corresponding, Kafka has offset
>>>>>> and timestamp:
>>>>>> - offset: It is used for incremental read, such as the state of a
>>>>>> checkpoint in a computing system.
>>>>>> - timestamp: It is explicitly specified by the user to specify the
>>>>>> scope of consumption. As start_timestamp of reading. Timestamp is a 
>>>>>> better
>>>>>> user aware interface. But offset/snapshotId is not human readable and
>>>>>> friendly.
>>>>>>
>>>>>> So there are scenarios where timestamp is used for incremental read.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 9, 2020 at 12:45 AM Sud <sudssf2...@gmail.com> wrote:
>>>>>>
>>>>>>
>>>>>> We are using incremental read for iceberg tables which gets quite few
>>>>>> appends ( ~500- 1000 per hour) . but instead of using timestamp we use
>>>>>> snapshot ids and track state of last read snapshot Id.
>>>>>> We are using timestamp as fallback when the state is incorrect, but
>>>>>> as you mentioned if timestamps are linear then it works as expected.
>>>>>> We also found that incremental reader might be slow when dealing with
>>>>>> > 2k snapshots in range. we are currently testing a manifest based
>>>>>> incremental reader which looks at manifest entries instead of scanning
>>>>>> snapshot history and accessing each snapshot.
>>>>>>
>>>>>> Is there any reason you can't use snapshot based incremental read?
>>>>>>
>>>>>> On Tue, Sep 8, 2020 at 9:06 AM Gautam <gautamkows...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hello Devs,
>>>>>>                    We are looking into adding workflows that read
>>>>>> data incrementally based on commit time. The ability to read deltas 
>>>>>> between
>>>>>> start / end commit timestamps on a table and ability to resume reading 
>>>>>> from
>>>>>> last read end timestamp. In that regard, we need the timestamps to be
>>>>>> linear in the current active snapshot history (newer versions always have
>>>>>> higher timestamps). Although Iceberg commit flow ensures the versions are
>>>>>> newer, there isn't a check to ensure timestamps are linear.
>>>>>>
>>>>>> Example flow, if two clients (clientA and clientB), whose time-clocks
>>>>>> are slightly off (say by a couple seconds), are committing frequently,
>>>>>> clientB might get to commit after clientA even if it's new snapshot
>>>>>> timestamps is out of order. I might be wrong but I haven't found a check 
>>>>>> in
>>>>>> HadoopTableOperations.commit() to ensure this above case does not happen.
>>>>>>
>>>>>>
>>>>>> On the other hand, restricting commits due to out-of-order timestamps
>>>>>> can hurt commit throughput so I can see why this isn't something Iceberg
>>>>>> might want to enforce based on System.currentTimeMillis(). Although if
>>>>>> clients had a way to define their own globally synchronized timestamps
>>>>>> (using external service or some monotonically increasing UUID) then 
>>>>>> iceberg
>>>>>> could allow an API to set that on the snapshot or use that instead of
>>>>>> System.currentTimeMillis(). Iceberg exposes something similar using
>>>>>> Sequence numbers in v2 format to track Deletes and Appends.
>>>>>> Is this a concern others have? If so how are folks handling this
>>>>>> today or are they not exposing such a feature at all due to the inherent
>>>>>> distributed timing problem? Would like to hear how others are
>>>>>> thinking/going about this. Thoughts?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> -Gautam.
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to