Hey Druids, Now that a join operator exists and is well on its way to being useful, I started thinking about some other pie in the sky ideas. In particular one that seems very useful is supporting updates and deletes.
Of course, we support updates and deletes today, but only on a whole-time-chunk basis (you need to rewrite the data for a time chunk in order to update or delete even a single row). Being able to do it based on streaming data would open up new use cases. One that comes up a lot is financial stuff: imagine a dataset where a row is a transaction (with a transaction ID), and there is a transaction status field that might go through a few updates until it finally settles. Another is session analysis: imagine a dataset where a row is a session (with a session ID) and as new events come in for that session, you want to update things like the number of events for the session, session duration, etc. What seems to be common are the following properties. 1) Updates are done by primary key. 2) The total number of primary keys is very large (could be in the trillions). 3) Each individual key generally gets a small number of updates, relatively soon after it's inserted, and eventually stops receiving updates. I don't have a real proposal yet nor the bandwidth to actually work on this right now. But I wanted to ask if anyone has thoughts on this potential approach that I think would work for streams with those properties. Imagine something like this: - We could encode an update as a delete followed by an insert. For this to work, both need to be associated with the segment that contained the original record. - In deep storage: one segment, instead of being one zip file, is multiple zip files, identified by a fileNumber. The first file looks like segments do today. Subsequent files include a normal-looking smoosh file like today's zips, but also include a bitmap of deleted row IDs from the previous file. Each zip with a fileNumber > 0 encodes incremental deletes (in the deleted-row bitmap) and new inserts (in the regular smoosh file). Let's call these "delta files" and call the one with fileNumber 0 the "original file". The delta files only contain deletes and inserts for primary keys that are present in the original file. - In the metadata store: we'll now have multiple rows per segment in the segment table (one for the original file and one per delta file). Same for the pending segment table. The datasource table (where Kafka topic checkpoints are stored) will need to associate checkpoints with the latest file number published for a given segment. - On real-time indexers: they already have code paths that decide what segments to route incoming records to. We need to add an ability to route updates and deletes to new delta files for existing segments. But unlike regular segments, the indexers can't serve queries on top of the delta files. Queries must be served by the Historicals that are serving those segments (otherwises, the deltas won't get applied properly). So Indexers will be creating and publishing these files but not serving them live. - On historicals: they need to fetch all available delta files from deep storage for the segments they're serving. They should probably expose the entire segment (including all delta files) as a single StorageAdapter, so the query stack doesn't have to change. Some additional notes: - I left out the part about how indexers figure out what segment to add an update or delete record to. There has to be some kind of primary-key-to-segment mapping to make this work. The most obvious way to do this is to partition segments by primary key. But segments are currently partitioned by time, not primary key, and I suspect we wouldn't want to change that. One reason is that I think we still want to target use cases where there is a heavy time component, and time partitioning is ideal for that. Another is that we do expect older records to eventually stop receiving updates, and if we partition by time, it means most segments won't be receiving updates, which is good (it limits the performance hit of handling delta files to a small number of segments). So we'd need another solution. I haven't thought of a perfect one. I think we'll probably have to require that the input stream is partitioned by primary key, and take advantage of that to split up the information that indexers will need in order to map records to the correct segments. I think this area is the biggest unknown. - Assuming we keep partitioning by time, it means the update mechanism would be able to update any field of a row except the timestamp (updating the timestamp would invalidate partitioning and sorting assumptions). I'm not sure if this would be generally OK or if it would rule out certain use cases. - I left out what an update record from the stream would look like. Do we ask people to provide upserts (entire new record), or delta updates (differences from the previous record)? Or we could decide to support both. By the way: no matter how we accept incoming updates, I think we should store them in delta files as entire new records. That way we don't have to cross-check multiple files to see the latest record. - This sketch of an implementation isn't actually real-time. It is able to accept updates and deletes from a stream, but, there is a delay in actually serving them (a delta file needs to be generated and pushed to Historicals). One way to make the serving real-time too is for the indexers to provide an API for historicals to fetch a stream of delta records that are relevant to their segments. The historicals would need to use that stream to achieve the same effect as a delta file. Presumably, whatever it is they're doing with it would be replaced with the "official" delta files when they're available. - For correctness, we'll need some mechanism for a historical to know when it has "enough" delta files to begin serving a segment. Otherwise you'll see mysterious rollbacks when segments move from one historical to another. - For efficiency, we'll need to compact delta files into the base segments. We could use the existing compaction system for this. - For performance, assuming we present the multifile thing as a single StorageAdapter, we'll need to be able to generate selectors that are as efficient as possible. This suggests we will need flags in the delta files that say whether they use the same dictionary as the original segment or not. If they do, we can generate a normal selector. If they don't then we will presumably be allocating new dictionary IDs for new values. They won't be sorted anymore, so we'll need to generate a selector where ColumnCapabilities.areDictionaryValuesSorted returns false. Would love to hear what people think. Gian