This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch feature/user-partitioned-databases-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 07da270f8c89e57bf6621815e2e93debd5ba988b
Author: Paul J. Davis <[email protected]>
AuthorDate: Thu Oct 25 17:03:00 2018 -0500

    Implement partitioned dbs
    
    This change introduces the ability for users to place a group of
    documents in a single shard range by specifying a "partition key" in the
    document id. A partition key is denoted by everything preceding a colon
    ':' in the document id.
    
    Every document id (except for design documents) in a partitioned
    database is required to have a partition key.
    
    Co-authored-by: Garren Smith <[email protected]>
    Co-authored-by: Robert Newson <[email protected]>
---
 src/chttpd/src/chttpd_db.erl        | 29 ++++++++++++++++--
 src/couch/src/couch_db.erl          |  5 ++++
 src/couch/src/couch_doc.erl         |  6 +++-
 src/couch/src/couch_partition.erl   | 60 +++++++++++++++++++++++++++++++++++++
 src/couch/src/couch_server.erl      |  3 ++
 src/fabric/src/fabric_db_create.erl |  6 +++-
 src/fabric/src/fabric_util.erl      |  9 ++++++
 src/mem3/src/mem3.erl               | 12 ++++++--
 8 files changed, 124 insertions(+), 6 deletions(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 49d7b58..3d6c79f 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -13,6 +13,7 @@
 -module(chttpd_db).
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("mem3/include/mem3.hrl").
 
 -export([handle_request/1, handle_compact_req/2, handle_design_req/2,
     db_req/2, couch_doc_open/4,handle_changes_req/2,
@@ -285,10 +286,12 @@ create_db_req(#httpd{}=Req, DbName) ->
     Q = chttpd:qs_value(Req, "q", config:get("cluster", "q", "8")),
     P = chttpd:qs_value(Req, "placement", config:get("cluster", "placement")),
     EngineOpt = parse_engine_opt(Req),
+    DbProps = parse_partitioned_opt(Req),
     Options = [
         {n, N},
         {q, Q},
-        {placement, P}
+        {placement, P},
+        {props, DbProps}
     ] ++ EngineOpt,
     DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)),
     case fabric:create_db(DbName, Options) of
@@ -314,7 +317,12 @@ delete_db_req(#httpd{}=Req, DbName) ->
     end.
 
 do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) ->
-    {ok, Db} = couch_db:clustered_db(DbName, Ctx),
+    Shard = hd(mem3:shards(DbName)),
+    Props = couch_util:get_value(props, Shard#shard.opts, []),
+    {ok, Db} = couch_db:clustered_db(DbName, [
+            {usr_ctx, Ctx},
+            {props, Props}
+        ]),
     Fun(Req, Db).
 
 db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) ->
@@ -1453,6 +1461,23 @@ parse_engine_opt(Req) ->
             end
     end.
 
+
+parse_partitioned_opt(Req) ->
+    case chttpd:qs_value(Req, "partitioned") of
+        undefined ->
+            [];
+        "false" ->
+            [];
+        "true" ->
+            [
+                {partitioned, true},
+                {hash, [couch_partition, hash, []]}
+            ];
+        _ ->
+            throw({bad_request, <<"invalid `partitioned` parameter">>})
+    end.
+
+
 parse_doc_query({Key, Value}, Args) ->
     case {Key, Value} of
         {"attachments", "true"} ->
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 1293f0c..577753a 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -56,6 +56,7 @@
     is_db/1,
     is_system_db/1,
     is_clustered/1,
+    is_partitioned/1,
 
     set_revs_limit/2,
     set_purge_infos_limit/2,
@@ -214,6 +215,10 @@ is_clustered(#db{}) ->
 is_clustered(?OLD_DB_REC = Db) ->
     ?OLD_DB_MAIN_PID(Db) == undefined.
 
+is_partitioned(#db{options = Options}) ->
+    Props = couch_util:get_value(props, Options, []),
+    couch_util:get_value(partitioned, Props, false).
+
 ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
     ok = gen_server:call(Pid, full_commit, infinity),
     {ok, StartTime}.
diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl
index f960ec5..22f899f 100644
--- a/src/couch/src/couch_doc.erl
+++ b/src/couch/src/couch_doc.erl
@@ -16,7 +16,7 @@
 -export([from_json_obj/1, from_json_obj_validate/1]).
 -export([from_json_obj/2, from_json_obj_validate/2]).
 -export([to_json_obj/2, has_stubs/1, merge_stubs/2]).
--export([validate_docid/1, validate_docid/2, get_validate_doc_fun/1]).
+-export([validate_docid/1, validate_docid/2, validate_docid/3, 
get_validate_doc_fun/1]).
 -export([doc_from_multi_part_stream/2, doc_from_multi_part_stream/3]).
 -export([doc_from_multi_part_stream/4]).
 -export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
@@ -199,11 +199,15 @@ parse_revs(_) ->
 
 
 validate_docid(DocId, DbName) ->
+    validate_docid(DocId, DbName, fun(_) -> ok end).
+
+validate_docid(DocId, DbName, Extra) ->
     case DbName =:= ?l2b(config:get("mem3", "shards_db", "_dbs")) andalso
         lists:member(DocId, ?SYSTEM_DATABASES) of
         true ->
             ok;
         false ->
+            Extra(DocId),
             validate_docid(DocId)
     end.
 
diff --git a/src/couch/src/couch_partition.erl 
b/src/couch/src/couch_partition.erl
new file mode 100644
index 0000000..7347f0f
--- /dev/null
+++ b/src/couch/src/couch_partition.erl
@@ -0,0 +1,60 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_partition).
+
+
+-export([
+    extract/1,
+    from_docid/1,
+    is_member/2,
+
+    hash/1
+]).
+
+
+extract(Value) when is_binary(Value) ->
+    case binary:split(Value, <<":">>) of
+        [Partition, Rest] ->
+            {Partition, Rest};
+        _ ->
+            undefined
+    end;
+
+extract(_) ->
+    undefined.
+
+
+from_docid(DocId) ->
+    case extract(DocId) of
+        undefined ->
+            throw({illegal_docid, <<"doc id must be of form partition:id">>});
+        {Partition, _} ->
+            Partition
+    end.
+
+
+hash(<<"_design/", _/binary>> = DocId) ->
+    erlang:crc32(DocId);
+
+hash(DocId) when is_binary(DocId) ->
+    erlang:crc32(from_docid(DocId)).
+
+
+is_member(DocId, Partition) ->
+    case extract(DocId) of
+        {Partition, _} ->
+            true;
+        _ ->
+            false
+    end.
+
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index c4b7bf1..95892fc 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -221,6 +221,9 @@ init([]) ->
     % Mark pluggable storage engines as a supported feature
     config:enable_feature('pluggable-storage-engines'),
 
+    % Mark partitioned databases as a supported feature
+    config:enable_feature(partitions),
+
     % read config and register for configuration changes
 
     % just stop if one of the config settings change. couch_server_sup
diff --git a/src/fabric/src/fabric_db_create.erl 
b/src/fabric/src/fabric_db_create.erl
index 94ffd56..2ea3d7b 100644
--- a/src/fabric/src/fabric_db_create.erl
+++ b/src/fabric/src/fabric_db_create.erl
@@ -168,6 +168,10 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix, 
Options) ->
         E when is_binary(E) -> [{<<"engine">>, E}];
         _ -> []
     end,
+    DbProps = case couch_util:get_value(props, Options) of
+        Props when is_list(Props) -> [{<<"props">>, {Props}}];
+        _ -> []
+    end,
     #doc{
         id = DbName,
         body = {[
@@ -175,7 +179,7 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix, 
Options) ->
             {<<"changelog">>, lists:sort(RawOut)},
             {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
             {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
-        ] ++ EngineProp}
+        ] ++ EngineProp ++ DbProps}
     }.
 
 db_exists(DbName) -> is_list(catch mem3:shards(DbName)).
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 44446b8..298921b 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -20,6 +20,7 @@
 -export([log_timeout/2, remove_done_workers/2]).
 -export([is_users_db/1, is_replicator_db/1]).
 -export([make_cluster_db/1, make_cluster_db/2]).
+-export([is_partitioned/1]).
 -export([upgrade_mrargs/1]).
 
 -compile({inline, [{doc_id_and_rev,1}]}).
@@ -327,6 +328,14 @@ doc_id_and_rev(#doc{id=DocId, revs={RevNum, [RevHash|_]}}) 
->
     {DocId, {RevNum, RevHash}}.
 
 
+is_partitioned(DbName0) when is_binary(DbName0) ->
+    Shards = mem3:shards(fabric:dbname(DbName0)),
+    is_partitioned(make_cluster_db(hd(Shards)));
+
+is_partitioned(Db) ->
+    couch_db:is_partitioned(Db).
+
+
 upgrade_mrargs(#mrargs{} = Args) ->
     Args;
 
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index ae52104..263b532 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -13,7 +13,7 @@
 -module(mem3).
 
 -export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2,
-    choose_shards/2, n/1, n/2, dbname/1, ushards/1]).
+    choose_shards/2, n/1, n/2, dbname/1, ushards/1, ushards/2]).
 -export([get_shard/3, local_shards/1, shard_suffix/1, fold_shards/2]).
 -export([sync_security/0, sync_security/1]).
 -export([compare_nodelists/0, compare_shards/1]).
@@ -71,7 +71,9 @@ compare_shards(DbName) ->
 
 -spec n(DbName::iodata()) -> integer().
 n(DbName) ->
-    n(DbName, <<"foo">>).
+    % Use _design to avoid issues with
+    % partition validation
+    n(DbName, <<"_design/foo">>).
 
 n(DbName, DocId) ->
     length(mem3:shards(DbName, DocId)).
@@ -136,6 +138,12 @@ ushards(DbName) ->
     Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap),
     mem3_util:downcast(Shards).
 
+-spec ushards(DbName::iodata(), DocId::binary()) -> [#shard{}].
+ushards(DbName, DocId) ->
+    Shards = shards_int(DbName, DocId, [ordered]),
+    Shard = hd(Shards),
+    mem3_util:downcast([Shard]).
+
 ushards(DbName, Shards0, ZoneMap) ->
     {L,S,D} = group_by_proximity(Shards0, ZoneMap),
     % Prefer shards in the local zone over shards in a different zone,

Reply via email to