The agenda looks good to me. I think it would also make sense to clarify the 
responsibilities of query engines and Iceberg. Not only in terms of uniqueness, 
but also in terms of applying diffs on read, for example. 

> On 23 May 2019, at 01:59, Ryan Blue <rb...@netflix.com.INVALID> wrote:
> 
> Here’s a rough agenda:
> 
> Use cases: everyone come with a use case that you’d like to have supported. 
> We’ll go around and introduce ourselves and our use cases.
> Main topic: How should Iceberg identify rows that are deleted?
> Side topics from my initial email, if we have time: should we use insert 
> diffs, should we support dense and sparse formats, etc.
> The main topic I think we should discuss is: How should Iceberg identify rows 
> that are deleted?
> 
> I’m phrasing it this way to avoid where I think we’re talking past one 
> another because we are making assumptions. The important thing is that there 
> are two main options:
> 
> Filename and position, vs
> Specific values of (few) columns in the data
> This phrasing also avoids discussing uniqueness constraints. Once we get down 
> to behavior, I think we agree. For example, I think we all agree that 
> uniqueness cannot be enforced in Iceberg.
> 
> If uniqueness can’t be enforced in Iceberg, the main choice comes down to how 
> we identify rows that are deleted. If we use (filename, position) then we 
> know that there is only one row. On the other hand, if we use data values to 
> identify rows then a delete may identify more than one row because there are 
> no uniqueness guarantees. I think we also agree that if there is more than 
> one row identified, all of them should be deleted.
> 
> At that point, there are trade-offs between the approaches:
> 
> When identifying deleted rows by data values, situations like the one that 
> Anton pointed out are possible.
> Jacques also had a good point about concurrency. If at all possible, we want 
> to be able to reconcile changes between concurrent commits without re-running 
> an operation.
> Sound like a reasonable amount to talk through?
> 
> rb
> 
> 
> On Wed, May 22, 2019 at 1:17 PM Erik Wright <erik.wri...@shopify.com 
> <mailto:erik.wri...@shopify.com>> wrote:
> 
> 
> On Wed, May 22, 2019 at 4:04 PM Cristian Opris <cop...@apple.com.invalid> 
> wrote:
> Agreed with Erik here, we're certainly not looking to build the equivalent of 
> a relational database, and for that matter not even that of a local disk 
> storage analytics database (like Vertica). Those are very 
> different designs with very different trade-offs and optimizations.
> 
> We're looking to automate and optimize specific types of file manipulation 
> for large files on remote storage, while presenting that to the user under 
> the common SQL API for bulk data manipulation (MERGE INTO)
> 
> What I would encourage is to decouple the storage model from the 
> implementation of that API. If Iceberg has support for merge-on-read of 
> upserts and deletes, in addition to its powerful support for partitioning, it 
> will be easy for a higher-level application to implement those APIs given 
> certain other constraints (that might not be appropriate to all applications).
> 
> Myself and Miguel are out on Friday, but Anton should be able to handle the 
> discussion on our side.
> 
> 
> Thanks,
> Cristian
> 
> 
>> On 22 May 2019, at 17:51, Erik Wright <erik.wri...@shopify.com.INVALID 
>> <mailto:erik.wri...@shopify.com.INVALID>> wrote:
>> 
>> We have two rows with the same natural key and we use that natural key in 
>> diff files:
>> nk | col1 | col2
>> 1 | 1 | 1
>> 1 | 2 | 2
>> Then we have a delete statement:
>> DELETE FROM t WHERE col1 = 1
>> 
>> I think this example cuts to the point of the differences of understanding. 
>> Does Iceberg want to be approaching the utility of a relational database, 
>> against which I can execute complex update queries? This is not what I would 
>> have imagined.
>> 
>> I would have, instead, imagined that it was up to the client to identify, 
>> through whatever means, that they want to update or delete a row with a 
>> given ID. If there are multiple (distinct) rows with the same ID, _too bad_. 
>> Any user should _expect_ that they could potentially see any one or more of 
>> those rows at read time. And that an upsert/delete would affect any/all of 
>> them (I would argue for all).
>> 
>> In summary: Instead of trying to come up with a consistent, logical handling 
>> for complex queries that are best suited for a relational database, leave 
>> such handling up to the client and concentrate on problems that can be 
>> solved simply and more generally.
>> 
>> On Wed, May 22, 2019 at 12:11 PM Ryan Blue <rb...@netflix.com.invalid 
>> <mailto:rb...@netflix.com.invalid>> wrote:
>> Yes, I think we should. I was going to propose one after catching up on the 
>> rest of this thread today.
>> 
>> On Wed, May 22, 2019 at 9:08 AM Anton Okolnychyi <aokolnyc...@apple.com 
>> <mailto:aokolnyc...@apple.com>> wrote:
>> Thanks! Would it make sense to discuss the agenda in advance?
>> 
>>> On 22 May 2019, at 17:04, Ryan Blue <rb...@netflix.com.INVALID 
>>> <mailto:rb...@netflix.com.INVALID>> wrote:
>>> 
>>> I sent out an invite and included everyone on this thread. If anyone else 
>>> would like to join, please join the Zoom meeting. If you'd like to be added 
>>> to the calendar invite, just let me know and I'll add you.
>>> 
>>> On Wed, May 22, 2019 at 8:57 AM Jacques Nadeau <jacq...@dremio.com 
>>> <mailto:jacq...@dremio.com>> wrote:
>>> works for me.
>>> 
>>> To make things easier, we can use my zoom meeting if people like:
>>> 
>>> Join Zoom Meeting
>>> https://zoom.us/j/4157302092 <https://zoom.us/j/4157302092>
>>> 
>>> One tap mobile
>>> +16465588656,,4157302092# US (New York)
>>> +16699006833,,4157302092# US (San Jose)
>>> 
>>> Dial by your location
>>>         +1 646 558 8656 US (New York)
>>>         +1 669 900 6833 US (San Jose)
>>>         877 853 5257 US Toll-free
>>>         888 475 4499 US Toll-free
>>> Meeting ID: 415 730 2092
>>> Find your local number: https://zoom.us/u/aH9XYBfm 
>>> <https://zoom.us/u/aH9XYBfm>
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> --
>>> Jacques Nadeau
>>> CTO and Co-Founder, Dremio
>>> 
>>> 
>>> On Wed, May 22, 2019 at 8:54 AM Ryan Blue <rb...@netflix.com.invalid 
>>> <mailto:rb...@netflix.com.invalid>> wrote:
>>> 9AM on Friday works best for me. How about then?
>>> 
>>> On Wed, May 22, 2019 at 5:05 AM Anton Okolnychyi <aokolnyc...@apple.com 
>>> <mailto:aokolnyc...@apple.com>> wrote:
>>> What about this Friday? One hour slot from 9:00 to 10:00 am or 10:00 to 
>>> 11:00 am PST? Some folks are based in London, so meeting later than this is 
>>> hard. If Friday doesn’t work, we can consider Tuesday or Wednesday next 
>>> week.
>>> 
>>>> On 22 May 2019, at 00:54, Jacques Nadeau <jacq...@dremio.com 
>>>> <mailto:jacq...@dremio.com>> wrote:
>>>> 
>>>> I agree with Anton that we should probably spend some time on hangouts 
>>>> further discussing things. Definitely differing expectations here and we 
>>>> seem to be talking a bit past each other.
>>>> --
>>>> Jacques Nadeau
>>>> CTO and Co-Founder, Dremio
>>>> 
>>>> 
>>>> On Tue, May 21, 2019 at 3:44 PM Cristian Opris <cop...@apple.com.invalid 
>>>> <mailto:cop...@apple.com.invalid>> wrote:
>>>> I love a good flame war :P
>>>> 
>>>>> On 21 May 2019, at 22:57, Jacques Nadeau <jacq...@dremio.com 
>>>>> <mailto:jacq...@dremio.com>> wrote:
>>>>> 
>>>>> 
>>>>> That's my point, truly independent writers (two Spark jobs, or a Spark 
>>>>> job and Dremio job) means a distributed transaction. It would need yet 
>>>>> another external transaction coordinator on top of both Spark and Dremio, 
>>>>> Iceberg by itself 
>>>>> cannot solve this.
>>>>>  
>>>>> I'm not ready to accept this. Iceberg already supports a set of semantics 
>>>>> around multiple writers committing simultaneously and how conflict 
>>>>> resolution is done. The same can be done here.
>>>> 
>>>> 
>>>> 
>>>> MVCC (which is what Iceberg tries to implement) requires a total ordering 
>>>> of snapshots. Also the snapshots need to be non-conflicting. I really 
>>>> don't see how any metadata data structures can solve this without an 
>>>> outside coordinator.
>>>> 
>>>> Consider this:
>>>> 
>>>> Snapshot 0: (K,A) = 1
>>>> Job X: UPDATE K SET A=A+1
>>>> Job Y: UPDATE K SET A=10
>>>> 
>>>> What should the final value of A be and who decides ?
>>>> 
>>>>>  
>>>>> By single writer, I don't mean single process, I mean multiple 
>>>>> coordinated processes like Spark executors coordinated by Spark driver. 
>>>>> The coordinator ensures that the data is pre-partitioned on 
>>>>> each executor, and the coordinator commits the snapshot. 
>>>>> 
>>>>> Note however that single writer job/multiple concurrent reader jobs is 
>>>>> perfectly feasible, i.e. it shouldn't be a problem to write from a Spark 
>>>>> job and read from multiple Dremio queries concurrently (for example)
>>>>> 
>>>>> :D This is still "single process" from my perspective. That process may 
>>>>> be coordinating other processes to do distributed work but ultimately it 
>>>>> is a single process. 
>>>> 
>>>> Fair enough
>>>> 
>>>>>  
>>>>> I'm not sure what you mean exactly. If we can't enforce uniqueness we 
>>>>> shouldn't assume it.
>>>>>  
>>>>> I disagree. We can specify that as a requirement and state that you'll 
>>>>> get unintended consequences if you provide your own keys and don't 
>>>>> maintain this.
>>>> 
>>>> There's no need for unintended consequences, we can specify consistent 
>>>> behaviour (and I believe the document says what that is)
>>>> 
>>>> 
>>>>>  
>>>>> We do expect that most of the time the natural key is unique, but the 
>>>>> eager and lazy with natural key designs can handle duplicates 
>>>>> consistently. Basically it's not a problem to have duplicate natural 
>>>>> keys, everything works fine.
>>>>> 
>>>>> That heavily depends on how things are implemented. For example, we may 
>>>>> write a bunch of code that generates internal data structures based on 
>>>>> this expectation. If we have to support duplicate matches, all of sudden 
>>>>> we can no longer size various data structures to improve performance and 
>>>>> may be unable to preallocate memory associated with a guaranteed 
>>>>> completion.
>>>> 
>>>> Again we need to operate on the assumption that this is a large scale 
>>>> distributed compute/remote storage scenario. Key matching is done with 
>>>> shuffles with data movement across the network, such optimizations would 
>>>> really have little impact on overall performance. Not to mention that most 
>>>> query engines would already optimize the shuffle already as much as it can 
>>>> be optimized.
>>>> 
>>>> It is true that if actual duplicate keys would make the key matching join 
>>>> (anti-join) somewhat more expensive, however it can be done in such a way 
>>>> that if the keys are in practice unique the join is as efficient as it can 
>>>> be.
>>>> 
>>>> 
>>>>> 
>>>>> Let me try and clarify each point:
>>>>> 
>>>>> - lookup for query or update on a non-(partition/bucket/sort) key 
>>>>> predicate implies scanning large amounts of data - because these are the 
>>>>> only data structures that can narrow down the lookup, right ? One could 
>>>>> argue that the min/max index (file skipping) can be applied to any 
>>>>> column, but in reality if that column is not sorted the min/max intervals 
>>>>> can have huge overlaps so it may be next to useless.
>>>>> - remote storage - this is a critical architecture decision - 
>>>>> implementations on local storage imply a vastly different design for the 
>>>>> entire system, storage and compute. 
>>>>> - deleting single records per snapshot is unfeasible in eager but also 
>>>>> particularly in the lazy design: each deletion creates a very small 
>>>>> snapshot. Deleting 1 million records one at a time would create 1 million 
>>>>> small files, and 1 million RPC calls.
>>>>> 
>>>>> Why is this unfeasible? If I have a dataset of 100mm files including 1mm 
>>>>> small files, is that a major problem? It seems like your usecase isn't 
>>>>> one where you want to support single record deletes but it is definitely 
>>>>> something important to many people.
>>>> 
>>>> 100 mm total files or 1 mm files per dataset is definitely a problem on 
>>>> HDFS, and I believe on S3 too. Single key delete would work just fine, but 
>>>> it's simply not optimal to do that on remote storage. This is a very well 
>>>> known problem with HDFS, and one of the very reasons to have something 
>>>> like Iceberg in the first place.
>>>> 
>>>> Basically the users would be able to do single key mutation, but it's not 
>>>> the use case we should be optimizing for, but it's really not advisable.
>>>> 
>>>> 
>>>>>  
>>>>> Eager is conceptually just lazy + compaction done, well, eagerly. The 
>>>>> logic for both is exactly the same, the trade-off is just that with eager 
>>>>> you implicitly compact every time so that you don't do any work on read, 
>>>>> while with lazy 
>>>>> you want to amortize the cost of compaction over multiple snapshots.
>>>>> 
>>>>> Basically there should be no difference between the two conceptually, or 
>>>>> with regard to keys, etc. The only difference is some mechanics in 
>>>>> implementation.
>>>>> 
>>>>> I think you have deconstruct the problem too much to say these are the 
>>>>> same (or at least that is what I'm starting to think given this thread). 
>>>>> It seems like real world implementation decisions (per our discussion 
>>>>> here) are in conflict. For example, you just argued against having a 1mm 
>>>>> arbitrary mutations but I think that is because you aren't thinking about 
>>>>> things over time with a delta implementation. Having 10,000 mutations a 
>>>>> day where we do delta compaction once a week
>>>>> and local file mappings (key to offset sparse bitmaps) seems like it 
>>>>> could result in very good performance in a case where we're mutating 
>>>>> small amounts of data. In this scenario, you may not do major compaction 
>>>>> ever unless you get to a high enough percentage of records that have been 
>>>>> deleted in the original dataset. That drives a very different set of 
>>>>> implementation decisions from a situation where you're trying to restate 
>>>>> an entire partition at once.
>>>> 
>>>> 
>>>> We operate on 1 billion mutations per day at least. This is the problem 
>>>> Iceberg wants to solve, I believe it's stated upfront. 10000/day is not a 
>>>> big data problem. It can be done fairly trivially and it would be 
>>>> supported, but there's not much point in extra optimizing for this use 
>>>> case I believe.
>>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>> 
>>> 
>>> -- 
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>> 
>> 
>> 
>> -- 
>> Ryan Blue
>> Software Engineer
>> Netflix
> 
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix

Reply via email to