9AM on Friday works best for me. How about then?

On Wed, May 22, 2019 at 5:05 AM Anton Okolnychyi <aokolnyc...@apple.com>
wrote:

> What about this Friday? One hour slot from 9:00 to 10:00 am or 10:00 to
> 11:00 am PST? Some folks are based in London, so meeting later than this is
> hard. If Friday doesn’t work, we can consider Tuesday or Wednesday next
> week.
>
> On 22 May 2019, at 00:54, Jacques Nadeau <jacq...@dremio.com> wrote:
>
> I agree with Anton that we should probably spend some time on hangouts
> further discussing things. Definitely differing expectations here and we
> seem to be talking a bit past each other.
> --
> Jacques Nadeau
> CTO and Co-Founder, Dremio
>
>
> On Tue, May 21, 2019 at 3:44 PM Cristian Opris <cop...@apple.com.invalid>
> wrote:
>
>> I love a good flame war :P
>>
>> On 21 May 2019, at 22:57, Jacques Nadeau <jacq...@dremio.com> wrote:
>>
>>
>> That's my point, truly independent writers (two Spark jobs, or a Spark
>>> job and Dremio job) means a distributed transaction. It would need yet
>>> another external transaction coordinator on top of both Spark and Dremio,
>>> Iceberg by itself
>>> cannot solve this.
>>>
>>
>> I'm not ready to accept this. Iceberg already supports a set of semantics
>> around multiple writers committing simultaneously and how conflict
>> resolution is done. The same can be done here.
>>
>>
>>
>>
>> MVCC (which is what Iceberg tries to implement) requires a total ordering
>> of snapshots. Also the snapshots need to be non-conflicting. I really don't
>> see how any metadata data structures can solve this without an outside
>> coordinator.
>>
>> Consider this:
>>
>> Snapshot 0: (K,A) = 1
>> Job X: UPDATE K SET A=A+1
>> Job Y: UPDATE K SET A=10
>>
>> What should the final value of A be and who decides ?
>>
>>
>>
>>> By single writer, I don't mean single process, I mean multiple
>>> coordinated processes like Spark executors coordinated by Spark driver. The
>>> coordinator ensures that the data is pre-partitioned on
>>> each executor, and the coordinator commits the snapshot.
>>>
>>> Note however that single writer job/multiple concurrent reader jobs is
>>> perfectly feasible, i.e. it shouldn't be a problem to write from a Spark
>>> job and read from multiple Dremio queries concurrently (for example)
>>>
>>
>> :D This is still "single process" from my perspective. That process may
>> be coordinating other processes to do distributed work but ultimately it is
>> a single process.
>>
>>
>> Fair enough
>>
>>
>>
>>> I'm not sure what you mean exactly. If we can't enforce uniqueness we
>>> shouldn't assume it.
>>>
>>
>> I disagree. We can specify that as a requirement and state that you'll
>> get unintended consequences if you provide your own keys and don't maintain
>> this.
>>
>>
>> There's no need for unintended consequences, we can specify consistent
>> behaviour (and I believe the document says what that is)
>>
>>
>>
>>
>>> We do expect that most of the time the natural key is unique, but the
>>> eager and lazy with natural key designs can handle duplicates
>>> consistently. Basically it's not a problem to have duplicate natural
>>> keys, everything works fine.
>>>
>>
>> That heavily depends on how things are implemented. For example, we may
>> write a bunch of code that generates internal data structures based on this
>> expectation. If we have to support duplicate matches, all of sudden we can
>> no longer size various data structures to improve performance and may be
>> unable to preallocate memory associated with a guaranteed completion.
>>
>>
>> Again we need to operate on the assumption that this is a large scale
>> distributed compute/remote storage scenario. Key matching is done with
>> shuffles with data movement across the network, such optimizations would
>> really have little impact on overall performance. Not to mention that most
>> query engines would already optimize the shuffle already as much as it can
>> be optimized.
>>
>> It is true that if actual duplicate keys would make the key matching join
>> (anti-join) somewhat more expensive, however it can be done in such a way
>> that if the keys are in practice unique the join is as efficient as it can
>> be.
>>
>>
>>
>> Let me try and clarify each point:
>>>
>>> - lookup for query or update on a non-(partition/bucket/sort) key
>>> predicate implies scanning large amounts of data - because these are the
>>> only data structures that can narrow down the lookup, right ? One could
>>> argue that the min/max index (file skipping) can be applied to any column,
>>> but in reality if that column is not sorted the min/max intervals can have
>>> huge overlaps so it may be next to useless.
>>> - remote storage - this is a critical architecture decision -
>>> implementations on local storage imply a vastly different design for the
>>> entire system, storage and compute.
>>> - deleting single records per snapshot is unfeasible in eager but also
>>> particularly in the lazy design: each deletion creates a very small
>>> snapshot. Deleting 1 million records one at a time would create 1 million
>>> small files, and 1 million RPC calls.
>>>
>>
>> Why is this unfeasible? If I have a dataset of 100mm files including 1mm
>> small files, is that a major problem? It seems like your usecase isn't one
>> where you want to support single record deletes but it is definitely
>> something important to many people.
>>
>>
>> 100 mm total files or 1 mm files per dataset is definitely a problem on
>> HDFS, and I believe on S3 too. Single key delete would work just fine, but
>> it's simply not optimal to do that on remote storage. This is a very well
>> known problem with HDFS, and one of the very reasons to have something like
>> Iceberg in the first place.
>>
>> Basically the users would be able to do single key mutation, but it's not
>> the use case we should be optimizing for, but it's really not advisable.
>>
>>
>>
>>
>>> Eager is conceptually just lazy + compaction done, well, eagerly. The
>>> logic for both is exactly the same, the trade-off is just that with eager
>>> you implicitly compact every time so that you don't do any work on read,
>>> while with lazy
>>> you want to amortize the cost of compaction over multiple snapshots.
>>>
>>> Basically there should be no difference between the two conceptually, or
>>> with regard to keys, etc. The only difference is some mechanics in
>>> implementation.
>>>
>>
>> I think you have deconstruct the problem too much to say these are the
>> same (or at least that is what I'm starting to think given this thread). It
>> seems like real world implementation decisions (per our discussion here)
>> are in conflict. For example, you just argued against having a 1mm
>> arbitrary mutations but I think that is because you aren't thinking about
>> things over time with a delta implementation. Having 10,000 mutations a day
>> where we do delta compaction once a week
>>
>> and local file mappings (key to offset sparse bitmaps) seems like it
>> could result in very good performance in a case where we're mutating small
>> amounts of data. In this scenario, you may not do major compaction ever
>> unless you get to a high enough percentage of records that have been
>> deleted in the original dataset. That drives a very different set of
>> implementation decisions from a situation where you're trying to restate an
>> entire partition at once.
>>
>>
>> We operate on 1 billion mutations per day at least. This is the problem
>> Iceberg wants to solve, I believe it's stated upfront. 10000/day is not a
>> big data problem. It can be done fairly trivially and it would be
>> supported, but there's not much point in extra optimizing for this use case
>> I believe.
>>
>>
>

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to