Hi
   I am new to erlang and riak.  I started to use riak as a kv store couple
of months ago. Now i want to implement a commit hook to riak so that riak
could help me to make some statistics.
i read some docs and write a pre-hook scripts, which will fetch the object
key and store it into a set.
   This hook works fine if there is only one client write to riak, but if i
increase the connection to riak writing, i found it lost some elements in
the set. Looks like the crdt_op did not do the merge operation.And there is
no obvious error in the log files.

   Could someone help me to finger out what happened or what i has missed.

i am using the riak 2.1.3

Thanks all!


Here is the hook scripts:

------------------------------------------------------------
------------------------------------------

-module(myhook).
-export([pretest/1]).

now_to_local_string({MegaSecs, Secs, MicroSecs}) ->
    LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}),
    {{Year, Month, Day}, {Hour, Minute, _}} = LocalTime,
    TimeStr = lists:flatten(io_lib:format("~4..0w~2..0w~2..0w~2..0w~2..0w",
                [Year, Month, Day, Hour, Minute])),
    TimeStr.

is_deleted(Object)->
    case dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object)) of
        {ok,_} ->
            true;
        _ ->
            false
    end.

pretest(Object) ->
    % timer:sleep(10000),
    try
    ObjBucket = riak_object:bucket(Object),
  %   riak_object:bucket(Obj).
% {<<"cn-archive">>,<<"local-test">>}

Bucket = element(2, ObjBucket),
BucketType = element(1, ObjBucket),

ObjKey = riak_object:key(Object),
% Key = binary_to_list(ObjKey),
% ObjData = riak_object:get_value(Object),
% Msg = binary_to_list(ObjData),
   CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b, Bucket},
{k, ObjKey}, {t, BucketType}]})),

   case is_deleted(Object) of
       true ->
        KeyPrefix = "delete";
_ ->
KeyPrefix = "update"
end,

CurMin = now_to_local_string(os:timestamp()),
   IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin,
KeyPrefix])),

   %% Get a riak client
    {ok, C} = riak:local_client(),
    % get node obj
ThisNode = atom_to_binary(node(), latin1),

% get index obj and set context
BType = <<"archive">>,
B = <<"local-test">>,
{SetObj, Context} = case C:get({BType, B}, IndexKey) of
   {error, notfound} ->
       ThisSetObj = riak_kv_crdt:new({BType, B}, IndexKey, riak_dt_orswot),
       {ThisSetObj, undefined};
   {ok, ThisSetObj} ->
       % The datatype update requires the context if the value exists
       {{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj, riak_dt_orswot),
       {ThisSetObj, Ctx}
end,

UpdateIndex = [{add, CommitItem}],
% UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, Context},
UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, undefined},
NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp),

error_logger:info_msg("Updating index for ~s,to set ~s~n",
[binary:bin_to_list(CommitItem), IndexKey]),

C:put(NewObj),
Object
    catch
    error:Error ->
{fail, lists:flatten(io_lib:format("[PREHOOKEXCEPTION]~p",[Error]))}
end.

------------------------------------------------------------
------------------------------------------


This is the set bucket props
------------------------------------------------------------
------------------------------------------

active: true
allow_mult: true
basic_quorum: false
big_vclock: 50
chash_keyfun: {riak_core_util,chash_std_keyfun}
claimant: 'riak@192.168.100.2'
datatype: set
dvv_enabled: true
dw: quorum
last_write_wins: false
linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun}
n_val: 3
notfound_ok: true
old_vclock: 86400
postcommit: []
pr: 0
precommit: []
pw: 0
r: quorum
rw: quorum
small_vclock: 50
w: quorum
young_vclock: 20
_______________________________________________
riak-users mailing list
riak-users@lists.basho.com
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to