The tasks might look something like this: CombinedPlanTask - List<ManifestPlanTask>
ManifestPlanTask - int start - int length - ManifestFile dataManifest - List<ManifestFile> deleteManifests On Thu, Dec 14, 2023 at 4:07 PM Ryan Blue <b...@tabular.io> wrote: > Seems like that track has expired (This Internet-Draft will expire on 13 > May 2022) > > Yeah, looks like we should just use POST. That’s too bad. QUERY seems like > a good idea to me. > > Distinguish planning using shard or not > > I think this was a mistake on my part. I was still thinking that we would > have a different endpoint for first-level planning to produce shards and > the route to actually get files. Since both are POST requests with the same > path (/v1/namespaces/ns/tables/t/scans) that no longer works. What about > /v1/namespaces/ns/tables/t/scan and /v1/namespaces/ns/tables/t/plan? The > latter could use some variant of planFiles since that’s what we are > wrapping in the Java API. > > Necessity of scan ID > > Yes, I agree. If you have shard IDs then you don’t really need a scan ID. > You could always have one internally but send it as part of the shard ID. > > Shape of shard payload > > I think we have 2 general options depending on how strict we want to be. > > 1. Require a standard shard definition > 2. Allow arbitrary JSON and leave it to the service > > I lean toward the first option, which would be a data manifest and the > associated delete manifests for the partition. We could also extend that to > a group of manifests, each with a list of delete manifests. And we could > also allow splitting to ensure tasks don’t get too large with big files. > This all looks basically like FileScanTask, but with manifests and delete > manifests. > > On Wed, Dec 13, 2023 at 4:39 PM Jack Ye <yezhao...@gmail.com> wrote: > >> Seems like that track has expired (This Internet-Draft will expire on 13 >> May 2022), not sure how these RFCs are managed, but it does not seem >> hopeful to have this verb in. I think people are mostly using POST for this >> use case already. >> >> But overall I think we are in agreement with the general direction. A few >> detail discussions: >> >> *Distinguish planning using shard or not* >> Maybe we should add a query parameter like *distributed=true* to >> distinguish your first and third case, since they are now sharing the same >> signature. If the requester wants to use distributed planning, then some >> sharding strategy is provided as a response for the requester to send more >> detailed requests. >> >> *Necessity of scan ID* >> In this approach, is scan ID still required? Because the shard payload >> already fully describes the information to retrieve, it seems like we can >> just drop the *scan-id* query parameter in the second case. Seems like >> it's kept for the case if we still want to persist some state, but it seems >> like we can make a stateless style fully working. >> >> *Shape of shard payload* >> What do you think is necessary information of the shard payload? It seems >> like we need at least the location of the manifests, plus the delete >> manifests or delete files associated with the manifests. I like the idea of >> making it a "shard task" that is similar to a file scan task, and it might >> allow us to return a mixture of both types of tasks, so we can have better >> control of the response size. >> >> -Jack >> >> On Wed, Dec 13, 2023 at 3:50 PM Ryan Blue <b...@tabular.io> wrote: >> >>> I just changed it to POST after looking into support for the QUERY >>> method. It's a new HTTP method for cases like this where you don't want to >>> pass everything through query params. Here's the QUERY method RFC >>> <https://www.ietf.org/archive/id/draft-ietf-httpbis-safe-method-w-body-02.html>, >>> but I guess it isn't finalized yet? >>> >>> Just read them like you would a POST request that doesn't actually >>> create anything. >>> >>> On Wed, Dec 13, 2023 at 3:45 PM Jack Ye <yezhao...@gmail.com> wrote: >>> >>>> Thanks, the Gist explains a lot of things. This is actually very close >>>> to our way of implementing the shard ID, we were defining the shard ID as a >>>> string, and the string content is actually something similar to the >>>> information of the JSON payload you showed, so we can persist minimum >>>> information in storage. >>>> >>>> Just one clarification needed for your Gist: >>>> >>>> > QUERY /v1/namespaces/ns/tables/t/scans?scan-id=1 >>>> > { "shard": { "id": 1, "manifests": ["C"] }, "filter": {"type": "in", >>>> "term": "x", "values": [1, 2, 3] } } >>>> > >>>> > { "file-scan-tasks": [...] } >>>> >>>> Here, what does this QUERY verb mean? Is that a GET? If it's GET, we >>>> cannot have a request body. That's actually why we expressed that as an ID >>>> string, since we can put it as a query parameter. >>>> >>>> -Jack >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Wed, Dec 13, 2023 at 3:25 PM Ryan Blue <b...@tabular.io> wrote: >>>> >>>>> Jack, >>>>> >>>>> It sounds like what I’m proposing isn’t quite clear because your >>>>> initial response was arguing for a sharding capability. I agree that >>>>> sharding is a good idea. I’m less confident about two points: >>>>> >>>>> 1. Requiring that the service is stateful. As Renjie pointed out, >>>>> that makes it harder to scale the service. >>>>> 2. The need for both pagination *and* sharding as separate things >>>>> >>>>> And I also think that Fokko has a good point about trying to keep >>>>> things simple and not requiring the CreateScan endpoint. >>>>> >>>>> For the first point, I’m proposing that we still have a CreateScan >>>>> endpoint, but instead of sending only a list of shard IDs it can also send >>>>> either a standard shard “task” or an optional JSON definition. Let’s >>>>> assume >>>>> we can send arbitrary JSON for an example. Say I have a table with 4 >>>>> manifests, A through D and that C and D match some query filter. When >>>>> I call the CreateScan endpoint, the service would send back tasks >>>>> with that information: {"id": 1, "manifests": ["C"]}, {"id": 2, >>>>> "manifests": ["D"]}. By sending what the shards mean (the manifests >>>>> to read), my service can be stateless: any node can get a request for >>>>> shard >>>>> 1, read manifest C, and send back the resulting data files. >>>>> >>>>> I don’t see much of an argument against doing this *in principle*. It >>>>> gives you the flexibility to store state if you choose or to send state to >>>>> the client for it to pass back when calling the GetTasks endpoint. >>>>> There is a practical problem, which is that it’s annoying to send a GET >>>>> request with a JSON payload because you can’t send a request body. It’s >>>>> probably obvious, but I’m also not a REST purist so I’d be fine using POST >>>>> or QUERY for this. It would look something like this Gist >>>>> <https://gist.github.com/rdblue/d2b65bd2ad20f85ee9d04ccf19ac8aba>. >>>>> >>>>> In your last reply, you also asked whether a stateless service is a >>>>> goal. I don’t think that it is, but if we can make simple changes to the >>>>> spec to allow more flexibility on the server side, I think that’s a good >>>>> direction. You also asked about a reference implementation and I consider >>>>> CatalogHandlers >>>>> <https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java> >>>>> to be that reference. It does everything except for the work done by your >>>>> choice of web application framework. It isn’t stateless, but it only >>>>> relies >>>>> on a Catalog implementation for persistence. >>>>> >>>>> For the second point, I don’t understand why we need both sharding and >>>>> pagination. That is, if we have a protocol that allows sharding, why is >>>>> pagination also needed? From my naive perspective on how sharding would >>>>> work, we should be able to use metadata from the manifest list to limit >>>>> the >>>>> potential number of data files in a given shard. As long as we can limit >>>>> the size of a shard to produce more, pagination seems like unnecessary >>>>> complication. >>>>> >>>>> Lastly, for Fokko’s point, I think another easy extension to the >>>>> proposal is to support a direct call to GetTasks. There’s a trade-off >>>>> here, but if you’re already sending the original filter along with the >>>>> request (in order to filter records from manifest C for instance) >>>>> then the request is already something the protocol can express. There’s an >>>>> objection concerning resource consumption on the service and creating >>>>> responses that are too large or take too long, but we can get around that >>>>> by responding with a code that instructs the client to use the >>>>> CreateScan API like 413 (Payload too large). I think that would allow >>>>> simple clients to function for all but really large tables. The gist above >>>>> also shows what this might look like. >>>>> >>>>> Ryan >>>>> >>>>> On Wed, Dec 13, 2023 at 11:53 AM Jack Ye <yezhao...@gmail.com> wrote: >>>>> >>>>>> The current proposal definitely makes the server stateful. In our >>>>>> prototype we used other components like DynamoDB to keep track of states. >>>>>> If keeping it stateless is a tenant we can definitely make the proposal >>>>>> closer to that direction. Maybe one thing to make sure is, is this a core >>>>>> tenant of the REST spec? Today we do not even have an official reference >>>>>> implementation of the REST server, I feel it is hard to say what are the >>>>>> core tenants. Maybe we should create one? >>>>>> >>>>>> Pagination is a common issue in the REST spec. We also see similar >>>>>> limitations with other APIs like GetTables, GetNamespaces. When a catalog >>>>>> has many namespaces and tables it suffers from the same issue. It is also >>>>>> not ideal for use cases like web browsers, since typically you display a >>>>>> small page of results and do not need the full list immediately. So I >>>>>> feel >>>>>> we cannot really avoid some state to be kept for those use cases. >>>>>> >>>>>> Chunked response might be a good way to work around it. We also >>>>>> thought about using HTTP2. However, these options seem to be not very >>>>>> compatible with OpenAPI. We can do some further research in this domain, >>>>>> would really appreciate it if anyone has more insights and experience >>>>>> with >>>>>> OpenAPI that can provide some suggestions. >>>>>> >>>>>> -Jack >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Dec 12, 2023 at 6:21 PM Renjie Liu <liurenjie2...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, Rahi and Jack: >>>>>>> >>>>>>> Thanks for raising this. >>>>>>> >>>>>>> My question is that the pagination and sharding will make the rest >>>>>>> server stateful, e.g. a sequence of calls is required to go to the same >>>>>>> server. In this case, how do we ensure the scalability of the rest >>>>>>> server? >>>>>>> >>>>>>> >>>>>>> On Wed, Dec 13, 2023 at 4:09 AM Fokko Driesprong <fo...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hey Rahil and Jack, >>>>>>>> >>>>>>>> Thanks for bringing this up. Ryan and I also discussed this briefly >>>>>>>> in the early days of PyIceberg and it would have helped a lot in the >>>>>>>> speed >>>>>>>> of development. We went for the traditional approach because that would >>>>>>>> also support all the other catalogs, but now that the REST catalog is >>>>>>>> taking off, I think it still makes a lot of sense to get it in. >>>>>>>> >>>>>>>> I do share the concern raised Ryan around the concepts of shards >>>>>>>> and pagination. For PyIceberg (but also for Go, Rust, and DuckDB) that >>>>>>>> are >>>>>>>> living in a single process today the concept of shards doesn't add >>>>>>>> value. I >>>>>>>> see your concern with long-running jobs, but for the non-distributed >>>>>>>> cases, >>>>>>>> it will add additional complexity. >>>>>>>> >>>>>>>> Some suggestions that come to mind: >>>>>>>> >>>>>>>> - Stream the tasks directly back using a chunked >>>>>>>> response, reducing the latency to the first task. This would also >>>>>>>> solve >>>>>>>> things with the pagination. The only downside I can think of is >>>>>>>> having >>>>>>>> delete files where you first need to make sure there are deletes >>>>>>>> relevant >>>>>>>> to the task, this might increase latency to the first task. >>>>>>>> - Making the sharding optional. If you want to shard you call >>>>>>>> the CreateScan first and then call the GetScanTask with the IDs. If >>>>>>>> you >>>>>>>> don't want to shard, you omit the shard parameter and fetch the >>>>>>>> tasks >>>>>>>> directly (here we need also replace the scan string with the full >>>>>>>> column/expression/snapshot-id etc). >>>>>>>> >>>>>>>> Looking forward to discussing this tomorrow in the community sync >>>>>>>> <https://iceberg.apache.org/community/#iceberg-community-events>! >>>>>>>> >>>>>>>> Kind regards, >>>>>>>> Fokko >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Op ma 11 dec 2023 om 19:05 schreef Jack Ye <yezhao...@gmail.com>: >>>>>>>> >>>>>>>>> Hi Ryan, thanks for the feedback! >>>>>>>>> >>>>>>>>> I was a part of this design discussion internally and can provide >>>>>>>>> more details. One reason for separating the CreateScan operation was >>>>>>>>> to >>>>>>>>> make the API asynchronous and thus keep HTTP communications short. >>>>>>>>> Consider >>>>>>>>> the case where we only have GetScanTasks API, and there is no shard >>>>>>>>> specified. It might take tens of seconds, or even minutes to read >>>>>>>>> through >>>>>>>>> all the manifest list and manifests before being able to return >>>>>>>>> anything. >>>>>>>>> This means the HTTP connection has to remain open during that period, >>>>>>>>> which >>>>>>>>> is not really a good practice in general (consider connection >>>>>>>>> failure, load >>>>>>>>> balancer and proxy load, etc.). And when we shift the API to >>>>>>>>> asynchronous, >>>>>>>>> it basically becomes something like the proposal, where a stateful ID >>>>>>>>> is >>>>>>>>> generated to be able to immediately return back to the client, and the >>>>>>>>> client get results by referencing the ID. So in our current prototype >>>>>>>>> implementation we are actually keeping this ID and the whole REST >>>>>>>>> service >>>>>>>>> is stateful. >>>>>>>>> >>>>>>>>> There were some thoughts we had about the possibility to define a >>>>>>>>> "shard ID generator" protocol: basically the client agrees with the >>>>>>>>> service >>>>>>>>> a way to deterministically generate shard IDs, and service uses it to >>>>>>>>> create shards. That sounds like what you are suggesting here, and it >>>>>>>>> pushes >>>>>>>>> the responsibility to the client side to determine the parallelism. >>>>>>>>> But in >>>>>>>>> some bad cases (e.g. there are many delete files and we need to read >>>>>>>>> all >>>>>>>>> those in each shard to apply filters), it seems like there might >>>>>>>>> still be >>>>>>>>> the long open connection issue above. What is your thought on that? >>>>>>>>> >>>>>>>>> -Jack >>>>>>>>> >>>>>>>>> On Sun, Dec 10, 2023 at 10:27 AM Ryan Blue <b...@tabular.io> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Rahil, thanks for working on this. It has some really good ideas >>>>>>>>>> that we hadn't considered before like a way for the service to plan >>>>>>>>>> how to >>>>>>>>>> break up the work of scan planning. I really like that idea because >>>>>>>>>> it >>>>>>>>>> makes it much easier for the service to keep memory consumption low >>>>>>>>>> across >>>>>>>>>> requests. >>>>>>>>>> >>>>>>>>>> My primary feedback is that I think it's a little too complicated >>>>>>>>>> (with both sharding and pagination) and could be modified slightly >>>>>>>>>> so that >>>>>>>>>> the service doesn't need to be stateful. If the service isn't >>>>>>>>>> necessarily >>>>>>>>>> stateful then it should be easier to build implementations. >>>>>>>>>> >>>>>>>>>> To make it possible for the service to be stateless, I'm >>>>>>>>>> proposing that rather than creating shard IDs that are tracked by the >>>>>>>>>> service, the information for a shard can be sent to the client. My >>>>>>>>>> assumption here is that most implementations would create shards by >>>>>>>>>> reading >>>>>>>>>> the manifest list, filtering on partition ranges, and creating a >>>>>>>>>> shard for >>>>>>>>>> some reasonable size of manifest content. For example, if a table >>>>>>>>>> has 100MB >>>>>>>>>> of metadata in 25 manifests that are about 4 MB each, then it might >>>>>>>>>> create >>>>>>>>>> 9 shards with 1-4 manifests each. The service could send those >>>>>>>>>> shards to >>>>>>>>>> the client as a list of manifests to read and the client could send >>>>>>>>>> the >>>>>>>>>> shard information back to the service to get the data files in each >>>>>>>>>> shard >>>>>>>>>> (along with the original filter). >>>>>>>>>> >>>>>>>>>> There's a slight trade-off that the protocol needs to define how >>>>>>>>>> to break the work into shards. I'm interested in hearing if that >>>>>>>>>> would work >>>>>>>>>> with how you were planning on building the service on your end. >>>>>>>>>> Another >>>>>>>>>> option is to let the service send back arbitrary JSON that would get >>>>>>>>>> returned for each shard. Either way, I like that this would make it >>>>>>>>>> so the >>>>>>>>>> service doesn't need to persist anything. We could also make it so >>>>>>>>>> that >>>>>>>>>> small tables don't require multiple requests. For example, a client >>>>>>>>>> could >>>>>>>>>> call the route to get file tasks with just a filter. >>>>>>>>>> >>>>>>>>>> What do you think? >>>>>>>>>> >>>>>>>>>> Ryan >>>>>>>>>> >>>>>>>>>> On Fri, Dec 8, 2023 at 10:41 AM Chertara, Rahil >>>>>>>>>> <rcher...@amazon.com.invalid> wrote: >>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>> My name is Rahil Chertara, and I’m a part of the Iceberg team at >>>>>>>>>>> Amazon EMR and Athena. I’m reaching out to share a proposal for a >>>>>>>>>>> new Scan >>>>>>>>>>> API that will be utilized by the RESTCatalog. The process for table >>>>>>>>>>> scan >>>>>>>>>>> planning is currently done within client engines such as Apache >>>>>>>>>>> Spark. By >>>>>>>>>>> moving scan functionality to the RESTCatalog, we can integrate >>>>>>>>>>> Iceberg >>>>>>>>>>> table scans with external services, which can lead to several >>>>>>>>>>> benefits. >>>>>>>>>>> >>>>>>>>>>> For example, we can leverage caching and indexes on the server >>>>>>>>>>> side to improve planning performance. Furthermore, by moving this >>>>>>>>>>> scan >>>>>>>>>>> logic to the RESTCatalog, non-JVM engines can integrate more >>>>>>>>>>> easily. This >>>>>>>>>>> all can be found in the detailed proposal below. Feel free to >>>>>>>>>>> comment, and >>>>>>>>>>> add your suggestions . >>>>>>>>>>> >>>>>>>>>>> Detailed proposal: >>>>>>>>>>> https://docs.google.com/document/d/1FdjCnFZM1fNtgyb9-v9fU4FwOX4An-pqEwSaJe8RgUg/edit#heading=h.cftjlkb2wh4h >>>>>>>>>>> >>>>>>>>>>> Github POC: https://github.com/apache/iceberg/pull/9252 >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> >>>>>>>>>>> Rahil Chertara >>>>>>>>>>> Amazon EMR & Athena >>>>>>>>>>> rcher...@amazon.com >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Ryan Blue >>>>>>>>>> Tabular >>>>>>>>>> >>>>>>>>> >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Tabular >>>>> >>>> >>> >>> -- >>> Ryan Blue >>> Tabular >>> >> > > -- > Ryan Blue > Tabular > -- Ryan Blue Tabular