Hi All hope everyone is doing well,
Wanted to revive the discussion around the Rest Table Scan API work. For a
refresher here is the original proposal:
https://docs.google.com/document/d/1FdjCnFZM1fNtgyb9-v9fU4FwOX4An-pqEwSaJe8RgUg/edit#heading=h.cftjlkb2wh4h
as well as the PR: https://github.com/apache/iceberg/pull/9252
From the last messages on the thread, I believe Ryan and Jack were in favor of
having two distinct api endpoints /plan and /scan, as well as a stricter json
definition for the "shard”, here is an example below from what was discussed.
POST /v1/namespaces/ns/tables/t/plan
{ "filter": { "type": "in", "term": "x", "values": [1, 2, 3] }, "select": ["x",
"a.b"]}
{ "manifest-plan-tasks": [
{ "start": 0, "length": 1000, "manifest": { "path":
"s3://some/manifest.avro", ...}, "delete-manifests": [...] },
{ ... }
]}
POST /v1/namespaces/ns/tables/t/scan
{ "filter": {"type": "in", "term": "x", "values": [1, 2, 3] },
"select": ["x", "a.b"],
"manifest-plan-task": { "start": 0, "length": 1000, "manifest": { "path":
"s3://some/manifest.avro", ...}, "delete-manifests": [...] } }
{ "file-scan-tasks": [...] }
POST /v1/namespaces/ns/tables/t/scan
{ "filter": {"type": "in", "term": "x", "values": [1, 2, 3] }, "select": ["x",
"a.b"]}
{ "file-scan-tasks": [...] }
However IIRC Micah and Renjie had some concerns around this stricter structure
as this can make it harder to evolve in the future, as well as some potential
scalability challenges for larger tables that have many manifest files. (Feel
free to expand further on the concerns if my understanding is incorrect).
Would appreciate if the community can leave any more thoughts/feedback on this
thread, as well as on the google doc, and the PR.
Regards,
Rahil Chertara
From: Renjie Liu <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Thursday, December 21, 2023 at 10:35 PM
To: "[email protected]" <[email protected]>
Subject: RE: [EXTERNAL] Proposal for REST APIs for Iceberg table scans
CAUTION: This email originated from outside of the organization. Do not click
links or open attachments unless you can confirm the sender and know the
content is safe.
I share the same concern with Micah. The shard detail should be implementation
details of the server, rather than exposing directly to the client. If the goal
is to make things stateless, we just need to attach a snapshot id + shard id,
then a determined algorithm is supposed to give the same result. Also another
concern is for huge analytics tables, we may have a lot of manifest files,
which may lead to large traffic from the rest server.
On Thu, Dec 21, 2023 at 7:41 AM Micah Kornfield
<[email protected]<mailto:[email protected]>> wrote:
Also +1 for having a more strict definition of the shard. Having arbitrary JSON
was basically what we experimented with a string shard ID, and we ended up with
something very similar to the manifest plan task you describe in the serialized
ID string.
IIUC the proposal correctly, I'd actually be -0.0 on the stricter structure. I
think forcing a contract where it isn't strictly necessary makes it harder to
evolve the system in the future. For example it makes it harder to address
potential scalability problems in a transparent way (e.g. extreme edge cases in
cardinality between manifest files and delete files).
It also seems like it might overly constrain implementations (it is not clear
we should need to compute the mapping between delete file manifests to data
file manifests up front to start planning).
On Tue, Dec 19, 2023 at 2:10 PM Jack Ye
<[email protected]<mailto:[email protected]>> wrote:
+1 for having /plan and /scan, sounds like a good idea to separate those 2
distinct actions.
Also +1 for having a more strict definition of the shard. Having arbitrary JSON
was basically what we experimented with a string shard ID, and we ended up with
something very similar to the manifest plan task you describe in the serialized
ID string.
So sounds like we are converging to the following APIs:
POST /v1/namespaces/ns/tables/t/plan
{ "filter": { "type": "in", "term": "x", "values": [1, 2, 3] }, "select": ["x",
"a.b"]}
{ "manifest-plan-tasks": [
{ "start": 0, "length": 1000, "manifest": { "path":
"s3://some/manifest.avro", ...}, "delete-manifests": [...] },
{ ... }
]}
POST /v1/namespaces/ns/tables/t/scan
{ "filter": {"type": "in", "term": "x", "values": [1, 2, 3] },
"select": ["x", "a.b"],
"manifest-plan-task": { "start": 0, "length": 1000, "manifest": { "path":
"s3://some/manifest.avro", ...}, "delete-manifests": [...] } }
{ "file-scan-tasks": [...] }
POST /v1/namespaces/ns/tables/t/scan
{ "filter": {"type": "in", "term": "x", "values": [1, 2, 3] }, "select": ["x",
"a.b"]}
{ "file-scan-tasks": [...] }
If this sounds good overall, we can update the prototype to have more detailed
discussions in code.
-Jack
On Thu, Dec 14, 2023 at 6:10 PM Ryan Blue
<[email protected]<mailto:[email protected]>> wrote:
The tasks might look something like this:
CombinedPlanTask
- List<ManifestPlanTask>
ManifestPlanTask
- int start
- int length
- ManifestFile dataManifest
- List<ManifestFile> deleteManifests
On Thu, Dec 14, 2023 at 4:07 PM Ryan Blue
<[email protected]<mailto:[email protected]>> wrote:
Seems like that track has expired (This Internet-Draft will expire on 13 May
2022)
Yeah, looks like we should just use POST. That’s too bad. QUERY seems like a
good idea to me.
Distinguish planning using shard or not
I think this was a mistake on my part. I was still thinking that we would have
a different endpoint for first-level planning to produce shards and the route
to actually get files. Since both are POST requests with the same path
(/v1/namespaces/ns/tables/t/scans) that no longer works. What about
/v1/namespaces/ns/tables/t/scan and /v1/namespaces/ns/tables/t/plan? The latter
could use some variant of planFiles since that’s what we are wrapping in the
Java API.
Necessity of scan ID
Yes, I agree. If you have shard IDs then you don’t really need a scan ID. You
could always have one internally but send it as part of the shard ID.
Shape of shard payload
I think we have 2 general options depending on how strict we want to be.
1. Require a standard shard definition
2. Allow arbitrary JSON and leave it to the service
I lean toward the first option, which would be a data manifest and the
associated delete manifests for the partition. We could also extend that to a
group of manifests, each with a list of delete manifests. And we could also
allow splitting to ensure tasks don’t get too large with big files. This all
looks basically like FileScanTask, but with manifests and delete manifests.
On Wed, Dec 13, 2023 at 4:39 PM Jack Ye
<[email protected]<mailto:[email protected]>> wrote:
Seems like that track has expired (This Internet-Draft will expire on 13 May
2022), not sure how these RFCs are managed, but it does not seem hopeful to
have this verb in. I think people are mostly using POST for this use case
already.
But overall I think we are in agreement with the general direction. A few
detail discussions:
Distinguish planning using shard or not
Maybe we should add a query parameter like distributed=true to distinguish your
first and third case, since they are now sharing the same signature. If the
requester wants to use distributed planning, then some sharding strategy is
provided as a response for the requester to send more detailed requests.
Necessity of scan ID
In this approach, is scan ID still required? Because the shard payload already
fully describes the information to retrieve, it seems like we can just drop the
scan-id query parameter in the second case. Seems like it's kept for the case
if we still want to persist some state, but it seems like we can make a
stateless style fully working.
Shape of shard payload
What do you think is necessary information of the shard payload? It seems like
we need at least the location of the manifests, plus the delete manifests or
delete files associated with the manifests. I like the idea of making it a
"shard task" that is similar to a file scan task, and it might allow us to
return a mixture of both types of tasks, so we can have better control of the
response size.
-Jack
On Wed, Dec 13, 2023 at 3:50 PM Ryan Blue
<[email protected]<mailto:[email protected]>> wrote:
I just changed it to POST after looking into support for the QUERY method. It's
a new HTTP method for cases like this where you don't want to pass everything
through query params. Here's the QUERY method
RFC<https://www.ietf.org/archive/id/draft-ietf-httpbis-safe-method-w-body-02.html>,
but I guess it isn't finalized yet?
Just read them like you would a POST request that doesn't actually create
anything.
On Wed, Dec 13, 2023 at 3:45 PM Jack Ye
<[email protected]<mailto:[email protected]>> wrote:
Thanks, the Gist explains a lot of things. This is actually very close to our
way of implementing the shard ID, we were defining the shard ID as a string,
and the string content is actually something similar to the information of the
JSON payload you showed, so we can persist minimum information in storage.
Just one clarification needed for your Gist:
> QUERY /v1/namespaces/ns/tables/t/scans?scan-id=1
> { "shard": { "id": 1, "manifests": ["C"] }, "filter": {"type": "in", "term":
> "x", "values": [1, 2, 3] } }
>
> { "file-scan-tasks": [...] }
Here, what does this QUERY verb mean? Is that a GET? If it's GET, we cannot
have a request body. That's actually why we expressed that as an ID string,
since we can put it as a query parameter.
-Jack
On Wed, Dec 13, 2023 at 3:25 PM Ryan Blue
<[email protected]<mailto:[email protected]>> wrote:
Jack,
It sounds like what I’m proposing isn’t quite clear because your initial
response was arguing for a sharding capability. I agree that sharding is a good
idea. I’m less confident about two points:
1. Requiring that the service is stateful. As Renjie pointed out, that makes
it harder to scale the service.
2. The need for both pagination and sharding as separate things
And I also think that Fokko has a good point about trying to keep things simple
and not requiring the CreateScan endpoint.
For the first point, I’m proposing that we still have a CreateScan endpoint,
but instead of sending only a list of shard IDs it can also send either a
standard shard “task” or an optional JSON definition. Let’s assume we can send
arbitrary JSON for an example. Say I have a table with 4 manifests, A through D
and that C and D match some query filter. When I call the CreateScan endpoint,
the service would send back tasks with that information: {"id": 1, "manifests":
["C"]}, {"id": 2, "manifests": ["D"]}. By sending what the shards mean (the
manifests to read), my service can be stateless: any node can get a request for
shard 1, read manifest C, and send back the resulting data files.
I don’t see much of an argument against doing this in principle. It gives you
the flexibility to store state if you choose or to send state to the client for
it to pass back when calling the GetTasks endpoint. There is a practical
problem, which is that it’s annoying to send a GET request with a JSON payload
because you can’t send a request body. It’s probably obvious, but I’m also not
a REST purist so I’d be fine using POST or QUERY for this. It would look
something like this
Gist<https://gist.github.com/rdblue/d2b65bd2ad20f85ee9d04ccf19ac8aba>.
In your last reply, you also asked whether a stateless service is a goal. I
don’t think that it is, but if we can make simple changes to the spec to allow
more flexibility on the server side, I think that’s a good direction. You also
asked about a reference implementation and I consider
CatalogHandlers<https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java>
to be that reference. It does everything except for the work done by your
choice of web application framework. It isn’t stateless, but it only relies on
a Catalog implementation for persistence.
For the second point, I don’t understand why we need both sharding and
pagination. That is, if we have a protocol that allows sharding, why is
pagination also needed? From my naive perspective on how sharding would work,
we should be able to use metadata from the manifest list to limit the potential
number of data files in a given shard. As long as we can limit the size of a
shard to produce more, pagination seems like unnecessary complication.
Lastly, for Fokko’s point, I think another easy extension to the proposal is to
support a direct call to GetTasks. There’s a trade-off here, but if you’re
already sending the original filter along with the request (in order to filter
records from manifest C for instance) then the request is already something the
protocol can express. There’s an objection concerning resource consumption on
the service and creating responses that are too large or take too long, but we
can get around that by responding with a code that instructs the client to use
the CreateScan API like 413 (Payload too large). I think that would allow
simple clients to function for all but really large tables. The gist above also
shows what this might look like.
Ryan
On Wed, Dec 13, 2023 at 11:53 AM Jack Ye
<[email protected]<mailto:[email protected]>> wrote:
The current proposal definitely makes the server stateful. In our prototype we
used other components like DynamoDB to keep track of states. If keeping it
stateless is a tenant we can definitely make the proposal closer to that
direction. Maybe one thing to make sure is, is this a core tenant of the REST
spec? Today we do not even have an official reference implementation of the
REST server, I feel it is hard to say what are the core tenants. Maybe we
should create one?
Pagination is a common issue in the REST spec. We also see similar limitations
with other APIs like GetTables, GetNamespaces. When a catalog has many
namespaces and tables it suffers from the same issue. It is also not ideal for
use cases like web browsers, since typically you display a small page of
results and do not need the full list immediately. So I feel we cannot really
avoid some state to be kept for those use cases.
Chunked response might be a good way to work around it. We also thought about
using HTTP2. However, these options seem to be not very compatible with
OpenAPI. We can do some further research in this domain, would really
appreciate it if anyone has more insights and experience with OpenAPI that can
provide some suggestions.
-Jack
On Tue, Dec 12, 2023 at 6:21 PM Renjie Liu
<[email protected]<mailto:[email protected]>> wrote:
Hi, Rahi and Jack:
Thanks for raising this.
My question is that the pagination and sharding will make the rest server
stateful, e.g. a sequence of calls is required to go to the same server. In
this case, how do we ensure the scalability of the rest server?
On Wed, Dec 13, 2023 at 4:09 AM Fokko Driesprong
<[email protected]<mailto:[email protected]>> wrote:
Hey Rahil and Jack,
Thanks for bringing this up. Ryan and I also discussed this briefly in the
early days of PyIceberg and it would have helped a lot in the speed of
development. We went for the traditional approach because that would also
support all the other catalogs, but now that the REST catalog is taking off, I
think it still makes a lot of sense to get it in.
I do share the concern raised Ryan around the concepts of shards and
pagination. For PyIceberg (but also for Go, Rust, and DuckDB) that are living
in a single process today the concept of shards doesn't add value. I see your
concern with long-running jobs, but for the non-distributed cases, it will add
additional complexity.
Some suggestions that come to mind:
* Stream the tasks directly back using a chunked response, reducing the
latency to the first task. This would also solve things with the pagination.
The only downside I can think of is having delete files where you first need to
make sure there are deletes relevant to the task, this might increase latency
to the first task.
* Making the sharding optional. If you want to shard you call the
CreateScan first and then call the GetScanTask with the IDs. If you don't want
to shard, you omit the shard parameter and fetch the tasks directly (here we
need also replace the scan string with the full column/expression/snapshot-id
etc).
Looking forward to discussing this tomorrow in the community
sync<https://iceberg.apache.org/community/#iceberg-community-events>!
Kind regards,
Fokko
Op ma 11 dec 2023 om 19:05 schreef Jack Ye
<[email protected]<mailto:[email protected]>>:
Hi Ryan, thanks for the feedback!
I was a part of this design discussion internally and can provide more details.
One reason for separating the CreateScan operation was to make the API
asynchronous and thus keep HTTP communications short. Consider the case where
we only have GetScanTasks API, and there is no shard specified. It might take
tens of seconds, or even minutes to read through all the manifest list and
manifests before being able to return anything. This means the HTTP connection
has to remain open during that period, which is not really a good practice in
general (consider connection failure, load balancer and proxy load, etc.). And
when we shift the API to asynchronous, it basically becomes something like the
proposal, where a stateful ID is generated to be able to immediately return
back to the client, and the client get results by referencing the ID. So in our
current prototype implementation we are actually keeping this ID and the whole
REST service is stateful.
There were some thoughts we had about the possibility to define a "shard ID
generator" protocol: basically the client agrees with the service a way to
deterministically generate shard IDs, and service uses it to create shards.
That sounds like what you are suggesting here, and it pushes the responsibility
to the client side to determine the parallelism. But in some bad cases (e.g.
there are many delete files and we need to read all those in each shard to
apply filters), it seems like there might still be the long open connection
issue above. What is your thought on that?
-Jack
On Sun, Dec 10, 2023 at 10:27 AM Ryan Blue
<[email protected]<mailto:[email protected]>> wrote:
Rahil, thanks for working on this. It has some really good ideas that we hadn't
considered before like a way for the service to plan how to break up the work
of scan planning. I really like that idea because it makes it much easier for
the service to keep memory consumption low across requests.
My primary feedback is that I think it's a little too complicated (with both
sharding and pagination) and could be modified slightly so that the service
doesn't need to be stateful. If the service isn't necessarily stateful then it
should be easier to build implementations.
To make it possible for the service to be stateless, I'm proposing that rather
than creating shard IDs that are tracked by the service, the information for a
shard can be sent to the client. My assumption here is that most
implementations would create shards by reading the manifest list, filtering on
partition ranges, and creating a shard for some reasonable size of manifest
content. For example, if a table has 100MB of metadata in 25 manifests that are
about 4 MB each, then it might create 9 shards with 1-4 manifests each. The
service could send those shards to the client as a list of manifests to read
and the client could send the shard information back to the service to get the
data files in each shard (along with the original filter).
There's a slight trade-off that the protocol needs to define how to break the
work into shards. I'm interested in hearing if that would work with how you
were planning on building the service on your end. Another option is to let the
service send back arbitrary JSON that would get returned for each shard. Either
way, I like that this would make it so the service doesn't need to persist
anything. We could also make it so that small tables don't require multiple
requests. For example, a client could call the route to get file tasks with
just a filter.
What do you think?
Ryan
On Fri, Dec 8, 2023 at 10:41 AM Chertara, Rahil <[email protected]>
wrote:
Hi all,
My name is Rahil Chertara, and I’m a part of the Iceberg team at Amazon EMR and
Athena. I’m reaching out to share a proposal for a new Scan API that will be
utilized by the RESTCatalog. The process for table scan planning is currently
done within client engines such as Apache Spark. By moving scan functionality
to the RESTCatalog, we can integrate Iceberg table scans with external
services, which can lead to several benefits.
For example, we can leverage caching and indexes on the server side to improve
planning performance. Furthermore, by moving this scan logic to the
RESTCatalog, non-JVM engines can integrate more easily. This all can be found
in the detailed proposal below. Feel free to comment, and add your suggestions .
Detailed proposal:
https://docs.google.com/document/d/1FdjCnFZM1fNtgyb9-v9fU4FwOX4An-pqEwSaJe8RgUg/edit#heading=h.cftjlkb2wh4h
Github POC: https://github.com/apache/iceberg/pull/9252
Regards,
Rahil Chertara
Amazon EMR & Athena
[email protected]<mailto:[email protected]>
--
Ryan Blue
Tabular
--
Ryan Blue
Tabular
--
Ryan Blue
Tabular
--
Ryan Blue
Tabular
--
Ryan Blue
Tabular