Shard Splitting API Proposal

I'd like thank everyone who contributed to the API discussion. As a result
we have a much better and consistent API that what we started with.

Before continuing I wanted to summarize to see what we ended up with. The
main changes since the initial proposal were switching to using /_reshard
as the main endpoint and having a detailed state transition history for
jobs.

* GET /_reshard

Top level summary. Besides the new _reshard endpoint, there `reason` and
the stats are more detailed.

Returns

{
    "completed": 3,
    "failed": 4,
    "running": 0,
    "state": "stopped",
    "state_reason": "Manual rebalancing",
    "stopped": 0,
    "total": 7
}

* PUT /_reshard/state

Start or stop global rebalacing.

Body

{
    "state": "stopped",
    "reason": "Manual rebalancing"
}

Returns

{
    "ok": true
}

* GET /_reshard/state

Return global resharding state and reason.

{
    "reason": "Manual rebalancing",
    "state": "stopped"
}


* GET /_reshard/jobs

Get the state of all the resharding jobs on the cluster. Now we have a
detailed
state transition history which looks similar what _scheduler/jobs have.

{
    "jobs": [
        {
            "history": [
                {
                    "detail": null,
                    "timestamp": "2019-02-06T22:28:06Z",
                    "type": "new"
                },
                ...
                {
                    "detail": null,
                    "timestamp": "2019-02-06T22:28:10Z",
                    "type": "completed"
                }
            ],
            "id":
"001-0a308ef9f7bd24bd4887d6e619682a6d3bb3d0fd94625866c5216ec1167b4e23",
            "job_state": "completed",
            "node": "node1@127.0.0.1",
            "source": "shards/00000000-ffffffff/db1.1549492084",
            "split_state": "completed",
            "start_time": "2019-02-06T22:28:06Z",
            "state_info": {},
            "targets": [
                "shards/00000000-7fffffff/db1.1549492084",
                "shards/80000000-ffffffff/db1.1549492084"
            ],
            "type": "split",
            "update_time": "2019-02-06T22:28:10Z"
        },
        {
           ....
        },
   ],
   "offset": 0,
   "total_rows": 7
}

* POST /_reshard/jobs

Create a new resharding job. This can now take other parameters and can
split multiple ranges.

To split one shard on a particular node

{
    "type": "split",
    "shard": "shards/80000000-bfffffff/db1.1549492084"
    "node": "node1@127.0.0.1"
}

To split a particulr range on all nodes:

{
     "type": "split",
     "db" : "db1",
     "range" : "80000000-bfffffff"
}

To split a range on just one node:

{
     "type": "split",
     "db" : "db1",
     "range" : "80000000-bfffffff",
     "node": "node1@127.0.0.1"
}

To split all ranges of a db on one node:

{
     "type": "split",
     "db" : "db1",
     "node": "node1@127.0.0.1"
}

Result now may contain multiple job IDs

[
    {
        "id":
"001-d457a4ea82877a26abbcbcc0e01c4b0070027e72b5bf0c4ff9c89eec2da9e790",
        "node": "node1@127.0.0.1",
        "ok": true,
        "shard": "shards/80000000-bfffffff/db1.1549986514"
    },
    {
        "id":
"001-7c1d20d2f7ef89f6416448379696a2cc98420e3e7855fdb21537d394dbc9b35f",
        "node": "node1@127.0.0.1",
        "ok": true,
        "shard": "shards/c0000000-ffffffff/db1.1549986514"
    }
]

* GET /_reshard/jobs/$jobid

Get just one job by ID.

Returns something like this (notice it was stopped since resharding is
stopped on the cluster).

{
    "history": [
        {
            "detail": null,
            "timestamp": "2019-02-12T16:55:41Z",
            "type": "new"
        },
        {
            "detail": "Shard splitting disabled",
            "timestamp": "2019-02-12T16:55:41Z",
            "type": "stopped"
        }
    ],
    "id":
"001-d457a4ea82877a26abbcbcc0e01c4b0070027e72b5bf0c4ff9c89eec2da9e790",
    "job_state": "stopped",
    "node": "node1@127.0.0.1",
    "source": "shards/80000000-bfffffff/db1.1549986514",
    "split_state": "new",
    "start_time": "1970-01-01T00:00:00Z",
    "state_info": {
        "reason": "Shard splitting disabled"
    },
    "targets": [
        "shards/80000000-9fffffff/db1.1549986514",
        "shards/a0000000-bfffffff/db1.1549986514"
    ],
    "type": "split",
    "update_time": "2019-02-12T16:55:41Z"
}


* GET /_reshard/jobs/$jobid/state

Get the running state of a particular job only

{
    "reason": "Shard splitting disabled",
    "state": "stopped"
}

* PUT /_reshard/jobs/$jobid/state

Stop or resume a particular job

Body

{
     "state": "stopped",
     "reason": "Pause this job for now"
}


What do we think? Hopefully I captured everyone's input.

I had started to test most of this behavior in API tests:

https://github.com/apache/couchdb/blob/shard-split/src/mem3/test/mem3_reshard_api_test.erl

Cheers,

-Nick


On Thu, Jan 31, 2019 at 3:43 PM Nick Vatamaniuc <vatam...@gmail.com> wrote:

> Hi Joan,
>
> Thank you for taking a look!
>
>
>
>> > * `GET /_shard_splits`
>>
>> As a result I'm concerned: would we then have duplicate endpoints
>> for /_shard_merges? Or would a unified /_reshard endpoint make
>> more sense here?
>>
>
> Good idea. Let's go with _reshard it's more general and allows for adding
> shard merging later.
>
>
>> I presume that if you've disabled shard manipulation on the
>> cluster, the status changes to "disabled" and the value is the
>> reason provided by the operator?
>>
>>
> Currently it's PUT /_reshard/state and body {"state":"running":"stopped",
> "reason":...}. This will  be shown at the top level in GET /_reshard/
> response.
>
>
>> > Get a summary of shard splitting for the whole cluster.
>>
>> What happens if every node in the cluster is restarted while a shard
>> split operation is occurring? Is the job persisted somewhere, i.e. in
>> special docs in _dbs, or would this kill the entire operation? I'm
>> considering rolling cluster upgrades here.
>>
>>
> The job will checkpoint as it goes through various steps that is saved in
> a _local document in the shards dbs. So if a node is restarted, the job
> will resume from the last checkpoint it stopped at
>
>
>>
>> > * `PUT /_shard_splits`
>>
>> Same comment as above about whether this is /_shard_splits or something
>> that could expand to shard merging in the future as well.
>>
>> If you persist the state of the shard splitting operation when disabling,
>> this could be used as a prerequisite to a rolling cluster upgrade
>> (i.e., an important documentation update).
>>
>>
> I think after discussing with other participants this became PUT
> /_shard_splits/state (now PUT _reshard/state). The disable state is also
> persisted on a per-node basis.
>
> An interesting thing to think about, is if a node is down when shard
> splitting is stopped or started, it won't find out about it. So I think we
> might have to do some kind querying of neighboring nodes to detect if a new
> node that just joined had missed a recent change to the global state.
>
>
>> > * `POST /_shard_splits/jobs`
>> >
>> > Start a shard splitting job.
>> >
>> > Request body:
>> >
>> > {
>> >     "node": "dbc...@db1.sandbox001.cloudant.net",
>> >     "shard": "shards/00000000-FFFFFFFF/username/dbname.$timestamp"
>> > }
>>
>>
>> 1. Agree with earlier comments that having to specify this per-node is
>> a nice to have, but really an end user wants to specify a *database*,
>> and have the API create the q*n jobs needed. It would then return an
>> array of jobs in the format you describe.
>>
>
> Ok, I think that's doable if we switch the response to be an array of
> job_ids. Then we might also have to think about various failure modes, such
> as what if the one of the nodes where a copy lives, is not up. Should that
> be a failure or do we continue splitting just 2 copies.
>
>>
>> 2. Same comment as above; why not add a new field for "type":"split" or
>> "merge" to make this expandable in the future?
>>
>> That makes sense, I can add a type field if we have _reshard as the top
> level endpoint.
>
>
>> -Joan
>>
>

Reply via email to