This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch feature/user-partitioned-databases-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 4a92766c31418033194c463cf1730c69ec1d915c
Author: Paul J. Davis <[email protected]>
AuthorDate: Thu Oct 25 17:03:00 2018 -0500

    Implement partitioned dbs
    
    This change introduces the ability for users to place a group of
    documents in a single shard range by specifying a "partition key" in the
    document id. A partition key is denoted by everything preceding a colon
    ':' in the document id.
    
    Every document id (except for design documents) in a partitioned
    database is required to have a partition key.
    
    Co-authored-by: Garren Smith <[email protected]>
    Co-authored-by: Robert Newson <[email protected]>
---
 src/chttpd/src/chttpd.erl                  |   2 +
 src/chttpd/src/chttpd_db.erl               |  62 ++++++++++-------
 src/chttpd/src/chttpd_show.erl             |   2 +-
 src/couch/src/couch_db.erl                 |  18 +++++
 src/couch/src/couch_httpd.erl              |   2 +
 src/couch/src/couch_httpd_db.erl           |   6 +-
 src/couch/src/couch_partition.erl          | 107 +++++++++++++++++++++++++++++
 src/couch/src/couch_server.erl             |   3 +
 src/couch_mrview/src/couch_mrview_show.erl |   2 +-
 src/fabric/src/fabric_db_create.erl        |   6 +-
 src/fabric/src/fabric_util.erl             |   9 +++
 src/mem3/src/mem3.erl                      |  12 +++-
 12 files changed, 200 insertions(+), 31 deletions(-)

diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index a562839..f14c061 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -902,6 +902,8 @@ error_info({error, {illegal_database_name, Name}}) ->
     {400, <<"illegal_database_name">>, Message};
 error_info({illegal_docid, Reason}) ->
     {400, <<"illegal_docid">>, Reason};
+error_info({illegal_partition, Reason}) ->
+    {400, <<"illegal_partition">>, Reason};
 error_info({_DocID,{illegal_docid,DocID}}) ->
     {400, <<"illegal_docid">>,DocID};
 error_info({error, {database_name_too_long, DbName}}) ->
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 49d7b58..e680b34 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -13,6 +13,7 @@
 -module(chttpd_db).
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("mem3/include/mem3.hrl").
 
 -export([handle_request/1, handle_compact_req/2, handle_design_req/2,
     db_req/2, couch_doc_open/4,handle_changes_req/2,
@@ -285,10 +286,12 @@ create_db_req(#httpd{}=Req, DbName) ->
     Q = chttpd:qs_value(Req, "q", config:get("cluster", "q", "8")),
     P = chttpd:qs_value(Req, "placement", config:get("cluster", "placement")),
     EngineOpt = parse_engine_opt(Req),
+    DbProps = parse_partitioned_opt(Req),
     Options = [
         {n, N},
         {q, Q},
-        {placement, P}
+        {placement, P},
+        {props, DbProps}
     ] ++ EngineOpt,
     DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)),
     case fabric:create_db(DbName, Options) of
@@ -314,7 +317,12 @@ delete_db_req(#httpd{}=Req, DbName) ->
     end.
 
 do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) ->
-    {ok, Db} = couch_db:clustered_db(DbName, Ctx),
+    Shard = hd(mem3:shards(DbName)),
+    Props = couch_util:get_value(props, Shard#shard.opts, []),
+    {ok, Db} = couch_db:clustered_db(DbName, [
+            {usr_ctx, Ctx},
+            {props, Props}
+        ]),
     Fun(Req, Db).
 
 db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) ->
@@ -783,7 +791,7 @@ db_doc_req(#httpd{method='GET', mochi_req=MochiReq}=Req, 
Db, DocId) ->
 
 db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) ->
     couch_httpd:validate_referer(Req),
-    couch_doc:validate_docid(DocId, couch_db:name(Db)),
+    couch_db:validate_docid(Db, DocId),
     chttpd:validate_ctype(Req, "multipart/form-data"),
 
     W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
@@ -839,7 +847,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, 
DocId) ->
         update_type = UpdateType
     } = parse_doc_query(Req),
     DbName = couch_db:name(Db),
-    couch_doc:validate_docid(DocId, DbName),
+    couch_db:validate_docid(Db, DocId),
 
     W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
     Options = [{user_ctx,Ctx}, {w,W}],
@@ -1330,7 +1338,7 @@ db_attachment_req(#httpd{method=Method, 
user_ctx=Ctx}=Req, Db, DocId, FileNamePa
                 % check for the existence of the doc to handle the 404 case.
                 couch_doc_open(Db, DocId, nil, [])
             end,
-            couch_doc:validate_docid(DocId, couch_db:name(Db)),
+            couch_db:validate_docid(Db, DocId),
             #doc{id=DocId};
         Rev ->
             case fabric:open_revs(Db, DocId, [Rev], [{user_ctx,Ctx}]) of
@@ -1453,6 +1461,23 @@ parse_engine_opt(Req) ->
             end
     end.
 
+
+parse_partitioned_opt(Req) ->
+    case chttpd:qs_value(Req, "partitioned") of
+        undefined ->
+            [];
+        "false" ->
+            [];
+        "true" ->
+            [
+                {partitioned, true},
+                {hash, [couch_partition, hash, []]}
+            ];
+        _ ->
+            throw({bad_request, <<"invalid `partitioned` parameter">>})
+    end.
+
+
 parse_doc_query({Key, Value}, Args) ->
     case {Key, Value} of
         {"attachments", "true"} ->
@@ -1672,16 +1697,17 @@ bulk_get_open_doc_revs(Db, {Props}, Options) ->
 
 
 bulk_get_open_doc_revs1(Db, Props, Options, {}) ->
-    case parse_field(<<"id">>, couch_util:get_value(<<"id">>, Props)) of
-        {error, {DocId, Error, Reason}} ->
-            {DocId, {error, {null, Error, Reason}}, Options};
-
-        {ok, undefined} ->
+    case couch_util:get_value(<<"id">>, Props) of
+        undefined ->
             Error = {null, bad_request, <<"document id missed">>},
             {null, {error, Error}, Options};
-
-        {ok, DocId} ->
-            bulk_get_open_doc_revs1(Db, Props, Options, {DocId})
+        DocId ->
+            try
+                couch_db:validate_docid(Db, DocId),
+                bulk_get_open_doc_revs1(Db, Props, Options, {DocId})
+            catch throw:{Error, Reason} ->
+                {DocId, {error, {null, Error, Reason}}, Options}
+            end
     end;
 bulk_get_open_doc_revs1(Db, Props, Options, {DocId}) ->
     RevStr = couch_util:get_value(<<"rev">>, Props),
@@ -1721,16 +1747,6 @@ bulk_get_open_doc_revs1(Db, Props, _, {DocId, Revs, 
Options}) ->
     end.
 
 
-parse_field(<<"id">>, undefined) ->
-    {ok, undefined};
-parse_field(<<"id">>, Value) ->
-    try
-        ok = couch_doc:validate_docid(Value),
-        {ok, Value}
-    catch
-        throw:{Error, Reason} ->
-            {error, {Value, Error, Reason}}
-    end;
 parse_field(<<"rev">>, undefined) ->
     {ok, undefined};
 parse_field(<<"rev">>, Value) ->
diff --git a/src/chttpd/src/chttpd_show.erl b/src/chttpd/src/chttpd_show.erl
index c6d232c..8a361fa 100644
--- a/src/chttpd/src/chttpd_show.erl
+++ b/src/chttpd/src/chttpd_show.erl
@@ -133,7 +133,7 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, 
DocId) ->
                 Options = [{user_ctx, Req#httpd.user_ctx}, {w, W}]
             end,
             NewDoc = couch_doc:from_json_obj_validate({NewJsonDoc}),
-            couch_doc:validate_docid(NewDoc#doc.id),
+            couch_db:validate_docid(Db, NewDoc#doc.id),
             {UpdateResult, NewRev} = fabric:update_doc(Db, NewDoc, Options),
             NewRevStr = couch_doc:rev_to_str(NewRev),
             case {UpdateResult, NewRev} of
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 1293f0c..42ae130 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -56,6 +56,7 @@
     is_db/1,
     is_system_db/1,
     is_clustered/1,
+    is_partitioned/1,
 
     set_revs_limit/2,
     set_purge_infos_limit/2,
@@ -82,6 +83,8 @@
     get_minimum_purge_seq/1,
     purge_client_exists/3,
 
+    validate_docid/2,
+
     update_doc/3,
     update_doc/4,
     update_docs/4,
@@ -214,6 +217,10 @@ is_clustered(#db{}) ->
 is_clustered(?OLD_DB_REC = Db) ->
     ?OLD_DB_MAIN_PID(Db) == undefined.
 
+is_partitioned(#db{options = Options}) ->
+    Props = couch_util:get_value(props, Options, []),
+    couch_util:get_value(partitioned, Props, false).
+
 ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
     ok = gen_server:call(Pid, full_commit, infinity),
     {ok, StartTime}.
@@ -772,6 +779,17 @@ name(#db{name=Name}) ->
 name(?OLD_DB_REC = Db) ->
     ?OLD_DB_NAME(Db).
 
+
+validate_docid(#db{} = Db, DocId) when is_binary(DocId) ->
+    couch_doc:validate_docid(DocId, name(Db)),
+    case is_partitioned(Db) of
+        true ->
+            couch_partition:validate_docid(DocId);
+        false ->
+            ok
+    end.
+
+
 update_doc(Db, Doc, Options) ->
     update_doc(Db, Doc, Options, interactive_edit).
 
diff --git a/src/couch/src/couch_httpd.erl b/src/couch/src/couch_httpd.erl
index a8cfca6..ebe0642 100644
--- a/src/couch/src/couch_httpd.erl
+++ b/src/couch/src/couch_httpd.erl
@@ -870,6 +870,8 @@ error_info(md5_mismatch) ->
     {400, <<"content_md5_mismatch">>, <<"Possible message corruption.">>};
 error_info({illegal_docid, Reason}) ->
     {400, <<"illegal_docid">>, Reason};
+error_info({illegal_partition, Reason}) ->
+    {400, <<"illegal_partition">>, Reason};
 error_info(not_found) ->
     {404, <<"not_found">>, <<"missing">>};
 error_info({not_found, Reason}) ->
diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl
index 81209d9..870aaf1 100644
--- a/src/couch/src/couch_httpd_db.erl
+++ b/src/couch/src/couch_httpd_db.erl
@@ -563,7 +563,7 @@ db_doc_req(#httpd{method = 'GET', mochi_req = MochiReq} = 
Req, Db, DocId) ->
 db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
     couch_httpd:validate_referer(Req),
     DbName = couch_db:name(Db),
-    couch_doc:validate_docid(DocId, DbName),
+    couch_db:validate_docid(Db, DocId),
     couch_httpd:validate_ctype(Req, "multipart/form-data"),
     Form = couch_httpd:parse_form(Req),
     case couch_util:get_value("_doc", Form) of
@@ -598,7 +598,7 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
 
 db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
     DbName = couch_db:name(Db),
-    couch_doc:validate_docid(DocId, DbName),
+    couch_db:validate_docid(Db, DocId),
 
     case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of
     ("multipart/related;" ++ _) = ContentType ->
@@ -1039,7 +1039,7 @@ 
db_attachment_req(#httpd{method=Method,mochi_req=MochiReq}=Req, Db, DocId, FileN
                 % check for the existence of the doc to handle the 404 case.
                 couch_doc_open(Db, DocId, nil, [])
             end,
-            couch_doc:validate_docid(DocId, couch_db:name(Db)),
+            couch_db:validate_docid(Db, DocId),
             #doc{id=DocId};
         Rev ->
             case couch_db:open_doc_revs(Db, DocId, [Rev], []) of
diff --git a/src/couch/src/couch_partition.erl 
b/src/couch/src/couch_partition.erl
new file mode 100644
index 0000000..021064b
--- /dev/null
+++ b/src/couch/src/couch_partition.erl
@@ -0,0 +1,107 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_partition).
+
+
+-export([
+    extract/1,
+    from_docid/1,
+    is_member/2,
+
+    validate_docid/1,
+    validate_partition/1,
+
+    hash/1
+]).
+
+
+extract(Value) when is_binary(Value) ->
+    case binary:split(Value, <<":">>) of
+        [Partition, Rest] ->
+            {Partition, Rest};
+        _ ->
+            undefined
+    end;
+
+extract(_) ->
+    undefined.
+
+
+from_docid(DocId) ->
+    case extract(DocId) of
+        undefined ->
+            throw({illegal_docid, <<"doc id must be of form partition:id">>});
+        {Partition, _} ->
+            Partition
+    end.
+
+
+is_member(DocId, Partition) ->
+    case extract(DocId) of
+        {Partition, _} ->
+            true;
+        _ ->
+            false
+    end.
+
+
+validate_docid(<<"_design/", _/binary>>) ->
+    ok;
+validate_docid(<<"_local/", _/binary>>) ->
+    ok;
+validate_docid(DocId) when is_binary(DocId) ->
+    % When this function is called we already know that
+    % DocId is already valid thus we only need to
+    % ensure that the partition exists and is not empty.
+    case extract(DocId) of
+        undefined ->
+            Msg1 = <<"Document id must contain a partition">>,
+            throw({illegal_docid, Msg1});
+        {Partition, _} ->
+            validate_partition(Partition)
+    end.
+
+
+validate_partition(<<>>) ->
+    throw({illegal_partition, <<"Partition must not be empty">>});
+validate_partition(Partition) when is_binary(Partition) ->
+    case Partition of
+        <<"_", _/binary>> ->
+            Msg1 = <<"Partition must not start with an underscore">>,
+            throw({illegal_partition, Msg1});
+        _ ->
+            ok
+    end,
+    case couch_util:validate_utf8(Partition) of
+        true ->
+            ok;
+        false ->
+            Msg2 = <<"Partition must be valid UTF-8">>,
+            throw({illegal_partition, Msg2})
+    end,
+    case extract(Partition) of
+        {_, _} ->
+            Msg3 = <<"Partition must not contain a colon">>,
+            throw({illegal_partition, Msg3});
+        undefined ->
+            ok
+    end;
+validate_partition(_) ->
+    throw({illegal_partition, <<"Partition must be a string">>}).
+
+
+hash(<<"_", _/binary>> = DocId) ->
+    erlang:crc32(DocId);
+
+hash(DocId) when is_binary(DocId) ->
+    erlang:crc32(from_docid(DocId)).
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index c4b7bf1..95892fc 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -221,6 +221,9 @@ init([]) ->
     % Mark pluggable storage engines as a supported feature
     config:enable_feature('pluggable-storage-engines'),
 
+    % Mark partitioned databases as a supported feature
+    config:enable_feature(partitions),
+
     % read config and register for configuration changes
 
     % just stop if one of the config settings change. couch_server_sup
diff --git a/src/couch_mrview/src/couch_mrview_show.erl 
b/src/couch_mrview/src/couch_mrview_show.erl
index e2c94ba..61e4c9b 100644
--- a/src/couch_mrview/src/couch_mrview_show.erl
+++ b/src/couch_mrview/src/couch_mrview_show.erl
@@ -133,7 +133,7 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, 
DocId) ->
                     Options = [{user_ctx, Req#httpd.user_ctx}]
             end,
             NewDoc = couch_doc:from_json_obj_validate({NewJsonDoc}),
-            couch_doc:validate_docid(NewDoc#doc.id),
+            couch_db:validate_docid(Db, NewDoc#doc.id),
             {ok, NewRev} = couch_db:update_doc(Db, NewDoc, Options),
             NewRevStr = couch_doc:rev_to_str(NewRev),
             {JsonResp1} = apply_headers(JsonResp0, [
diff --git a/src/fabric/src/fabric_db_create.erl 
b/src/fabric/src/fabric_db_create.erl
index 94ffd56..2ea3d7b 100644
--- a/src/fabric/src/fabric_db_create.erl
+++ b/src/fabric/src/fabric_db_create.erl
@@ -168,6 +168,10 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix, 
Options) ->
         E when is_binary(E) -> [{<<"engine">>, E}];
         _ -> []
     end,
+    DbProps = case couch_util:get_value(props, Options) of
+        Props when is_list(Props) -> [{<<"props">>, {Props}}];
+        _ -> []
+    end,
     #doc{
         id = DbName,
         body = {[
@@ -175,7 +179,7 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix, 
Options) ->
             {<<"changelog">>, lists:sort(RawOut)},
             {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
             {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
-        ] ++ EngineProp}
+        ] ++ EngineProp ++ DbProps}
     }.
 
 db_exists(DbName) -> is_list(catch mem3:shards(DbName)).
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 44446b8..298921b 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -20,6 +20,7 @@
 -export([log_timeout/2, remove_done_workers/2]).
 -export([is_users_db/1, is_replicator_db/1]).
 -export([make_cluster_db/1, make_cluster_db/2]).
+-export([is_partitioned/1]).
 -export([upgrade_mrargs/1]).
 
 -compile({inline, [{doc_id_and_rev,1}]}).
@@ -327,6 +328,14 @@ doc_id_and_rev(#doc{id=DocId, revs={RevNum, [RevHash|_]}}) 
->
     {DocId, {RevNum, RevHash}}.
 
 
+is_partitioned(DbName0) when is_binary(DbName0) ->
+    Shards = mem3:shards(fabric:dbname(DbName0)),
+    is_partitioned(make_cluster_db(hd(Shards)));
+
+is_partitioned(Db) ->
+    couch_db:is_partitioned(Db).
+
+
 upgrade_mrargs(#mrargs{} = Args) ->
     Args;
 
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index ae52104..263b532 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -13,7 +13,7 @@
 -module(mem3).
 
 -export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2,
-    choose_shards/2, n/1, n/2, dbname/1, ushards/1]).
+    choose_shards/2, n/1, n/2, dbname/1, ushards/1, ushards/2]).
 -export([get_shard/3, local_shards/1, shard_suffix/1, fold_shards/2]).
 -export([sync_security/0, sync_security/1]).
 -export([compare_nodelists/0, compare_shards/1]).
@@ -71,7 +71,9 @@ compare_shards(DbName) ->
 
 -spec n(DbName::iodata()) -> integer().
 n(DbName) ->
-    n(DbName, <<"foo">>).
+    % Use _design to avoid issues with
+    % partition validation
+    n(DbName, <<"_design/foo">>).
 
 n(DbName, DocId) ->
     length(mem3:shards(DbName, DocId)).
@@ -136,6 +138,12 @@ ushards(DbName) ->
     Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap),
     mem3_util:downcast(Shards).
 
+-spec ushards(DbName::iodata(), DocId::binary()) -> [#shard{}].
+ushards(DbName, DocId) ->
+    Shards = shards_int(DbName, DocId, [ordered]),
+    Shard = hd(Shards),
+    mem3_util:downcast([Shard]).
+
 ushards(DbName, Shards0, ZoneMap) ->
     {L,S,D} = group_by_proximity(Shards0, ZoneMap),
     % Prefer shards in the local zone over shards in a different zone,

Reply via email to