It wasn't recorded, but I can summarize what we talked about. Sorry I
haven't sent this out earlier.

We talked about the options and some of the background in Iceberg --
basically that it isn't possible to determine the order of commits before
you commit so you can't rely on some monotonically increasing value from a
snapshot to know which deltas to apply to a file. The result is that we
can't apply diffs to data files using a rule like "files older than X"
because we can't identify those files without the snapshot history.

That gives us basically 2 options for scoping delete diffs: either identify
the files to apply a diff to when writing the diff, or log changes applied
to a snapshot and keep the snapshot history around (which is how we know
the order of snapshots). The first option is not good if you want to write
without reading data to determine where the deleted records are. The second
prevents cleaning up snapshot history.

We also talked about whether we should encode IDs in data files. Jacques
pointed out that retrying a commit is easier if you don't need to re-read
the original data to reconcile changes. For example, if a data file was
compacted in a concurrent write, how do we reconcile a delete for it? We
discussed other options, like rolling back the compaction for delete
events. I think that's a promising option.

For action items, Jacques was going to think about whether we need to
encode IDs in data files or if we could use positions to identify rows and
write up a summary/proposal. Erik was going to take on planning how
identifying rows without reading data would work and similarly write up a
summary/proposal.

That's from memory, so if I've missed anything, I hope that other attendees
will fill in the details!

rb

On Wed, May 29, 2019 at 3:34 PM Venkatakrishnan Sowrirajan <vsowr...@asu.edu>
wrote:

> Hi Ryan,
>
> I couldn't attend the meeting. Just curious, if this is recorded by any
> chance.
>
> Regards
> Venkata krishnan
>
>
> On Fri, May 24, 2019 at 8:49 AM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> Yes, I agree. I'll talk a little about a couple of the constraints of
>> this as well.
>>
>> On Fri, May 24, 2019 at 5:52 AM Anton Okolnychyi <aokolnyc...@apple.com>
>> wrote:
>>
>>> The agenda looks good to me. I think it would also make sense to clarify
>>> the responsibilities of query engines and Iceberg. Not only in terms of
>>> uniqueness, but also in terms of applying diffs on read, for example.
>>>
>>> On 23 May 2019, at 01:59, Ryan Blue <rb...@netflix.com.INVALID> wrote:
>>>
>>> Here’s a rough agenda:
>>>
>>>    - Use cases: everyone come with a use case that you’d like to have
>>>    supported. We’ll go around and introduce ourselves and our use cases.
>>>    - Main topic: How should Iceberg identify rows that are deleted?
>>>    - Side topics from my initial email, if we have time: should we use
>>>    insert diffs, should we support dense and sparse formats, etc.
>>>
>>> The main topic I think we should discuss is: *How should Iceberg
>>> identify rows that are deleted?*
>>>
>>> I’m phrasing it this way to avoid where I think we’re talking past one
>>> another because we are making assumptions. The important thing is that
>>> there are two main options:
>>>
>>>    - Filename and position, vs
>>>    - Specific values of (few) columns in the data
>>>
>>> This phrasing also avoids discussing uniqueness constraints. Once we get
>>> down to behavior, I think we agree. For example, I think we all agree that
>>> uniqueness cannot be enforced in Iceberg.
>>>
>>> If uniqueness can’t be enforced in Iceberg, the main choice comes down
>>> to how we identify rows that are deleted. If we use (filename, position)
>>> then we know that there is only one row. On the other hand, if we use data
>>> values to identify rows then a delete may identify more than one row
>>> because there are no uniqueness guarantees. I think we also agree that if
>>> there is more than one row identified, all of them should be deleted.
>>>
>>> At that point, there are trade-offs between the approaches:
>>>
>>>    - When identifying deleted rows by data values, situations like the
>>>    one that Anton pointed out are possible.
>>>    - Jacques also had a good point about concurrency. If at all
>>>    possible, we want to be able to reconcile changes between concurrent
>>>    commits without re-running an operation.
>>>
>>> Sound like a reasonable amount to talk through?
>>>
>>> rb
>>>
>>> On Wed, May 22, 2019 at 1:17 PM Erik Wright <erik.wri...@shopify.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Wed, May 22, 2019 at 4:04 PM Cristian Opris <
>>>> cop...@apple.com.invalid> wrote:
>>>>
>>>>> Agreed with Erik here, we're certainly not looking to build the
>>>>> equivalent of a relational database, and for that matter not even that of 
>>>>> a
>>>>> local disk storage analytics database (like Vertica). Those are very
>>>>> different designs with very different trade-offs and optimizations.
>>>>>
>>>>> We're looking to automate and optimize specific types of file
>>>>> manipulation for large files on remote storage, while presenting that to
>>>>> the user under the common SQL API for *bulk* data manipulation (MERGE
>>>>> INTO)
>>>>>
>>>>
>>>> What I would encourage is to decouple the storage model from the
>>>> implementation of that API. If Iceberg has support for merge-on-read of
>>>> upserts and deletes, in addition to its powerful support for partitioning,
>>>> it will be easy for a higher-level application to implement those APIs
>>>> given certain other constraints (that might not be appropriate to all
>>>> applications).
>>>>
>>>> Myself and Miguel are out on Friday, but Anton should be able to handle
>>>>> the discussion on our side.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Cristian
>>>>>
>>>>>
>>>>> On 22 May 2019, at 17:51, Erik Wright <erik.wri...@shopify.com.INVALID>
>>>>> wrote:
>>>>>
>>>>> We have two rows with the same natural key and we use that natural key
>>>>>> in diff files:
>>>>>> nk | col1 | col2
>>>>>> 1 | 1 | 1
>>>>>> 1 | 2 | 2
>>>>>> Then we have a delete statement:
>>>>>> DELETE FROM t WHERE col1 = 1
>>>>>
>>>>>
>>>>> I think this example cuts to the point of the differences of
>>>>> understanding. Does Iceberg want to be approaching the utility of a
>>>>> relational database, against which I can execute complex update queries?
>>>>> This is not what I would have imagined.
>>>>>
>>>>> I would have, instead, imagined that it was up to the client to
>>>>> identify, through whatever means, that they want to update or delete a row
>>>>> with a given ID. If there are multiple (distinct) rows with the same ID,
>>>>> _too bad_. Any user should _expect_ that they could potentially see any 
>>>>> one
>>>>> or more of those rows at read time. And that an upsert/delete would affect
>>>>> any/all of them (I would argue for all).
>>>>>
>>>>> *In summary:* Instead of trying to come up with a consistent, logical
>>>>> handling for complex queries that are best suited for a relational
>>>>> database, leave such handling up to the client and concentrate on problems
>>>>> that can be solved simply and more generally.
>>>>>
>>>>> On Wed, May 22, 2019 at 12:11 PM Ryan Blue <rb...@netflix.com.invalid>
>>>>> wrote:
>>>>>
>>>>>> Yes, I think we should. I was going to propose one after catching up
>>>>>> on the rest of this thread today.
>>>>>>
>>>>>> On Wed, May 22, 2019 at 9:08 AM Anton Okolnychyi <
>>>>>> aokolnyc...@apple.com> wrote:
>>>>>>
>>>>>>> Thanks! Would it make sense to discuss the agenda in advance?
>>>>>>>
>>>>>>> On 22 May 2019, at 17:04, Ryan Blue <rb...@netflix.com.INVALID>
>>>>>>> wrote:
>>>>>>>
>>>>>>> I sent out an invite and included everyone on this thread. If anyone
>>>>>>> else would like to join, please join the Zoom meeting. If you'd like to 
>>>>>>> be
>>>>>>> added to the calendar invite, just let me know and I'll add you.
>>>>>>>
>>>>>>> On Wed, May 22, 2019 at 8:57 AM Jacques Nadeau <jacq...@dremio.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> works for me.
>>>>>>>>
>>>>>>>> To make things easier, we can use my zoom meeting if people like:
>>>>>>>>
>>>>>>>> Join Zoom Meeting
>>>>>>>> https://zoom.us/j/4157302092
>>>>>>>>
>>>>>>>> One tap mobile
>>>>>>>> +16465588656,,4157302092# US (New York)
>>>>>>>> +16699006833,,4157302092# US (San Jose)
>>>>>>>>
>>>>>>>> Dial by your location
>>>>>>>>         +1 646 558 8656 US (New York)
>>>>>>>>         +1 669 900 6833 US (San Jose)
>>>>>>>>         877 853 5257 US Toll-free
>>>>>>>>         888 475 4499 US Toll-free
>>>>>>>> Meeting ID: 415 730 2092
>>>>>>>> Find your local number: https://zoom.us/u/aH9XYBfm
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Jacques Nadeau
>>>>>>>> CTO and Co-Founder, Dremio
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 22, 2019 at 8:54 AM Ryan Blue <
>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>
>>>>>>>>> 9AM on Friday works best for me. How about then?
>>>>>>>>>
>>>>>>>>> On Wed, May 22, 2019 at 5:05 AM Anton Okolnychyi <
>>>>>>>>> aokolnyc...@apple.com> wrote:
>>>>>>>>>
>>>>>>>>>> What about this Friday? One hour slot from 9:00 to 10:00 am or
>>>>>>>>>> 10:00 to 11:00 am PST? Some folks are based in London, so meeting 
>>>>>>>>>> later
>>>>>>>>>> than this is hard. If Friday doesn’t work, we can consider Tuesday or
>>>>>>>>>> Wednesday next week.
>>>>>>>>>>
>>>>>>>>>> On 22 May 2019, at 00:54, Jacques Nadeau <jacq...@dremio.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> I agree with Anton that we should probably spend some time on
>>>>>>>>>> hangouts further discussing things. Definitely differing 
>>>>>>>>>> expectations here
>>>>>>>>>> and we seem to be talking a bit past each other.
>>>>>>>>>> --
>>>>>>>>>> Jacques Nadeau
>>>>>>>>>> CTO and Co-Founder, Dremio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, May 21, 2019 at 3:44 PM Cristian Opris <
>>>>>>>>>> cop...@apple.com.invalid> wrote:
>>>>>>>>>>
>>>>>>>>>>> I love a good flame war :P
>>>>>>>>>>>
>>>>>>>>>>> On 21 May 2019, at 22:57, Jacques Nadeau <jacq...@dremio.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That's my point, truly independent writers (two Spark jobs, or a
>>>>>>>>>>>> Spark job and Dremio job) means a distributed transaction. It 
>>>>>>>>>>>> would need
>>>>>>>>>>>> yet another external transaction coordinator on top of both Spark 
>>>>>>>>>>>> and
>>>>>>>>>>>> Dremio, Iceberg by itself
>>>>>>>>>>>> cannot solve this.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I'm not ready to accept this. Iceberg already supports a set of
>>>>>>>>>>> semantics around multiple writers committing simultaneously and how
>>>>>>>>>>> conflict resolution is done. The same can be done here.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> MVCC (which is what Iceberg tries to implement) requires a total
>>>>>>>>>>> ordering of snapshots. Also the snapshots need to be 
>>>>>>>>>>> non-conflicting. I
>>>>>>>>>>> really don't see how any metadata data structures can solve this 
>>>>>>>>>>> without an
>>>>>>>>>>> outside coordinator.
>>>>>>>>>>>
>>>>>>>>>>> Consider this:
>>>>>>>>>>>
>>>>>>>>>>> Snapshot 0: (K,A) = 1
>>>>>>>>>>> Job X: UPDATE K SET A=A+1
>>>>>>>>>>> Job Y: UPDATE K SET A=10
>>>>>>>>>>>
>>>>>>>>>>> What should the final value of A be and who decides ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> By single writer, I don't mean single process, I mean multiple
>>>>>>>>>>>> coordinated processes like Spark executors coordinated by Spark 
>>>>>>>>>>>> driver. The
>>>>>>>>>>>> coordinator ensures that the data is pre-partitioned on
>>>>>>>>>>>> each executor, and the coordinator commits the snapshot.
>>>>>>>>>>>>
>>>>>>>>>>>> Note however that single writer job/multiple concurrent reader
>>>>>>>>>>>> jobs is perfectly feasible, i.e. it shouldn't be a problem to 
>>>>>>>>>>>> write from a
>>>>>>>>>>>> Spark job and read from multiple Dremio queries concurrently (for 
>>>>>>>>>>>> example)
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> :D This is still "single process" from my perspective. That
>>>>>>>>>>> process may be coordinating other processes to do distributed work 
>>>>>>>>>>> but
>>>>>>>>>>> ultimately it is a single process.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Fair enough
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> I'm not sure what you mean exactly. If we can't enforce
>>>>>>>>>>>> uniqueness we shouldn't assume it.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I disagree. We can specify that as a requirement and state that
>>>>>>>>>>> you'll get unintended consequences if you provide your own keys and 
>>>>>>>>>>> don't
>>>>>>>>>>> maintain this.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> There's no need for unintended consequences, we can specify
>>>>>>>>>>> consistent behaviour (and I believe the document says what that is)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> We do expect that most of the time the natural key is unique,
>>>>>>>>>>>> but the eager and lazy with natural key designs can handle 
>>>>>>>>>>>> duplicates
>>>>>>>>>>>> consistently. Basically it's not a problem to have duplicate
>>>>>>>>>>>> natural keys, everything works fine.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That heavily depends on how things are implemented. For example,
>>>>>>>>>>> we may write a bunch of code that generates internal data 
>>>>>>>>>>> structures based
>>>>>>>>>>> on this expectation. If we have to support duplicate matches, all 
>>>>>>>>>>> of sudden
>>>>>>>>>>> we can no longer size various data structures to improve 
>>>>>>>>>>> performance and
>>>>>>>>>>> may be unable to preallocate memory associated with a guaranteed 
>>>>>>>>>>> completion.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Again we need to operate on the assumption that this is a large
>>>>>>>>>>> scale distributed compute/remote storage scenario. Key matching is 
>>>>>>>>>>> done
>>>>>>>>>>> with shuffles with data movement across the network, such 
>>>>>>>>>>> optimizations
>>>>>>>>>>> would really have little impact on overall performance. Not to 
>>>>>>>>>>> mention that
>>>>>>>>>>> most query engines would already optimize the shuffle already as 
>>>>>>>>>>> much as it
>>>>>>>>>>> can be optimized.
>>>>>>>>>>>
>>>>>>>>>>> It is true that if actual duplicate keys would make the key
>>>>>>>>>>> matching join (anti-join) somewhat more expensive, however it can 
>>>>>>>>>>> be done
>>>>>>>>>>> in such a way that if the keys are in practice unique the join is as
>>>>>>>>>>> efficient as it can be.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Let me try and clarify each point:
>>>>>>>>>>>>
>>>>>>>>>>>> - lookup for query or update on a non-(partition/bucket/sort)
>>>>>>>>>>>> key predicate implies scanning large amounts of data - because 
>>>>>>>>>>>> these are
>>>>>>>>>>>> the only data structures that can narrow down the lookup, right ? 
>>>>>>>>>>>> One could
>>>>>>>>>>>> argue that the min/max index (file skipping) can be applied to any 
>>>>>>>>>>>> column,
>>>>>>>>>>>> but in reality if that column is not sorted the min/max intervals 
>>>>>>>>>>>> can have
>>>>>>>>>>>> huge overlaps so it may be next to useless.
>>>>>>>>>>>> - remote storage - this is a critical architecture decision -
>>>>>>>>>>>> implementations on local storage imply a vastly different design 
>>>>>>>>>>>> for the
>>>>>>>>>>>> entire system, storage and compute.
>>>>>>>>>>>> - deleting single records per snapshot is unfeasible in eager
>>>>>>>>>>>> but also particularly in the lazy design: each deletion creates a 
>>>>>>>>>>>> very
>>>>>>>>>>>> small snapshot. Deleting 1 million records one at a time would 
>>>>>>>>>>>> create 1
>>>>>>>>>>>> million small files, and 1 million RPC calls.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Why is this unfeasible? If I have a dataset of 100mm files
>>>>>>>>>>> including 1mm small files, is that a major problem? It seems like 
>>>>>>>>>>> your
>>>>>>>>>>> usecase isn't one where you want to support single record deletes 
>>>>>>>>>>> but it is
>>>>>>>>>>> definitely something important to many people.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 100 mm total files or 1 mm files per dataset is definitely a
>>>>>>>>>>> problem on HDFS, and I believe on S3 too. Single key delete would 
>>>>>>>>>>> work just
>>>>>>>>>>> fine, but it's simply not optimal to do that on remote storage. 
>>>>>>>>>>> This is a
>>>>>>>>>>> very well known problem with HDFS, and one of the very reasons to 
>>>>>>>>>>> have
>>>>>>>>>>> something like Iceberg in the first place.
>>>>>>>>>>>
>>>>>>>>>>> Basically the users would be able to do single key mutation, but
>>>>>>>>>>> it's not the use case we should be optimizing for, but it's really 
>>>>>>>>>>> not
>>>>>>>>>>> advisable.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Eager is conceptually just lazy + compaction done, well,
>>>>>>>>>>>> eagerly. The logic for both is exactly the same, the trade-off is 
>>>>>>>>>>>> just that
>>>>>>>>>>>> with eager you implicitly compact every time so that you don't do 
>>>>>>>>>>>> any work
>>>>>>>>>>>> on read, while with lazy
>>>>>>>>>>>> you want to amortize the cost of compaction over multiple
>>>>>>>>>>>> snapshots.
>>>>>>>>>>>>
>>>>>>>>>>>> Basically there should be no difference between the two
>>>>>>>>>>>> conceptually, or with regard to keys, etc. The only difference is 
>>>>>>>>>>>> some
>>>>>>>>>>>> mechanics in implementation.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I think you have deconstruct the problem too much to say these
>>>>>>>>>>> are the same (or at least that is what I'm starting to think given 
>>>>>>>>>>> this
>>>>>>>>>>> thread). It seems like real world implementation decisions (per our
>>>>>>>>>>> discussion here) are in conflict. For example, you just argued 
>>>>>>>>>>> against
>>>>>>>>>>> having a 1mm arbitrary mutations but I think that is because you 
>>>>>>>>>>> aren't
>>>>>>>>>>> thinking about things over time with a delta implementation. Having 
>>>>>>>>>>> 10,000
>>>>>>>>>>> mutations a day where we do delta compaction once a week
>>>>>>>>>>>
>>>>>>>>>>> and local file mappings (key to offset sparse bitmaps) seems
>>>>>>>>>>> like it could result in very good performance in a case where we're
>>>>>>>>>>> mutating small amounts of data. In this scenario, you may not do 
>>>>>>>>>>> major
>>>>>>>>>>> compaction ever unless you get to a high enough percentage of 
>>>>>>>>>>> records that
>>>>>>>>>>> have been deleted in the original dataset. That drives a very 
>>>>>>>>>>> different set
>>>>>>>>>>> of implementation decisions from a situation where you're trying to 
>>>>>>>>>>> restate
>>>>>>>>>>> an entire partition at once.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We operate on 1 billion mutations per day at least. This is the
>>>>>>>>>>> problem Iceberg wants to solve, I believe it's stated upfront. 
>>>>>>>>>>> 10000/day is
>>>>>>>>>>> not a big data problem. It can be done fairly trivially and it 
>>>>>>>>>>> would be
>>>>>>>>>>> supported, but there's not much point in extra optimizing for this 
>>>>>>>>>>> use case
>>>>>>>>>>> I believe.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Software Engineer
>>>>>>>>> Netflix
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>>
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to