http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/if/Snapshot.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/if/Snapshot.proto b/hbase-native-client/if/Snapshot.proto deleted file mode 100644 index ae1a1e6..0000000 --- a/hbase-native-client/if/Snapshot.proto +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package hbase.pb; - -option java_package = "org.apache.hadoop.hbase.protobuf.generated"; -option java_outer_classname = "SnapshotProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; -option optimize_for = SPEED; - -import "FS.proto"; -import "HBase.proto"; - -message SnapshotFileInfo { - enum Type { - HFILE = 1; - WAL = 2; - } - - required Type type = 1; - - optional string hfile = 3; - - optional string wal_server = 4; - optional string wal_name = 5; -} - -message SnapshotRegionManifest { - optional int32 version = 1; - - required RegionInfo region_info = 2; - repeated FamilyFiles family_files = 3; - - message StoreFile { - required string name = 1; - optional Reference reference = 2; - - // TODO: Add checksums or other fields to verify the file - optional uint64 file_size = 3; - } - - message FamilyFiles { - required bytes family_name = 1; - repeated StoreFile store_files = 2; - } -} - -message SnapshotDataManifest { - required TableSchema table_schema = 1; - repeated SnapshotRegionManifest region_manifests = 2; -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/if/Tracing.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/if/Tracing.proto b/hbase-native-client/if/Tracing.proto deleted file mode 100644 index 5a64cfc..0000000 --- a/hbase-native-client/if/Tracing.proto +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package hbase.pb; - -option java_package = "org.apache.hadoop.hbase.protobuf.generated"; -option java_outer_classname = "TracingProtos"; -option java_generate_equals_and_hash = true; -option optimize_for = SPEED; - -//Used to pass through the information necessary to continue -//a trace after an RPC is made. All we need is the traceid -//(so we know the overarching trace this message is a part of), and -//the id of the current span when this message was sent, so we know -//what span caused the new span we will create when this message is received. -message RPCTInfo { - optional int64 trace_id = 1; - optional int64 parent_id = 2; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/if/VisibilityLabels.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/if/VisibilityLabels.proto b/hbase-native-client/if/VisibilityLabels.proto deleted file mode 100644 index d2dc44d..0000000 --- a/hbase-native-client/if/VisibilityLabels.proto +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package hbase.pb; - -option java_package = "org.apache.hadoop.hbase.protobuf.generated"; -option java_outer_classname = "VisibilityLabelsProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; -option optimize_for = SPEED; - -import "Client.proto"; - -message VisibilityLabelsRequest { - repeated VisibilityLabel visLabel = 1; -} - -message VisibilityLabel { - required bytes label = 1; - optional uint32 ordinal = 2; -} - -message VisibilityLabelsResponse { - repeated RegionActionResult result = 1; -} - -message SetAuthsRequest { - required bytes user = 1; - repeated bytes auth = 2; -} - -message UserAuthorizations { - required bytes user = 1; - repeated uint32 auth = 2; -} - -message MultiUserAuthorizations { - repeated UserAuthorizations userAuths = 1; -} - -message GetAuthsRequest { - required bytes user = 1; -} - -message GetAuthsResponse { - required bytes user = 1; - repeated bytes auth = 2; -} - -message ListLabelsRequest { - optional string regex = 1; -} - -message ListLabelsResponse { - repeated bytes label = 1; -} - -service VisibilityLabelsService { - rpc addLabels(VisibilityLabelsRequest) - returns (VisibilityLabelsResponse); - rpc setAuths(SetAuthsRequest) - returns (VisibilityLabelsResponse); - rpc clearAuths(SetAuthsRequest) - returns (VisibilityLabelsResponse); - rpc getAuths(GetAuthsRequest) - returns (GetAuthsResponse); - rpc listLabels(ListLabelsRequest) - returns (ListLabelsResponse); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/if/WAL.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/if/WAL.proto b/hbase-native-client/if/WAL.proto deleted file mode 100644 index 2061b22..0000000 --- a/hbase-native-client/if/WAL.proto +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package hbase.pb; - -option java_package = "org.apache.hadoop.hbase.protobuf.generated"; -option java_outer_classname = "WALProtos"; -option java_generic_services = false; -option java_generate_equals_and_hash = true; -option optimize_for = SPEED; - -import "HBase.proto"; -import "Client.proto"; - -message WALHeader { - optional bool has_compression = 1; - optional bytes encryption_key = 2; - optional bool has_tag_compression = 3; - optional string writer_cls_name = 4; - optional string cell_codec_cls_name = 5; -} - -/* - * Protocol buffer version of WALKey; see WALKey comment, not really a key but WALEdit header - * for some KVs - */ -message WALKey { - required bytes encoded_region_name = 1; - required bytes table_name = 2; - required uint64 log_sequence_number = 3; - required uint64 write_time = 4; - /* - This parameter is deprecated in favor of clusters which - contains the list of clusters that have consumed the change. - It is retained so that the log created by earlier releases (0.94) - can be read by the newer releases. - */ - optional UUID cluster_id = 5 [deprecated=true]; - - repeated FamilyScope scopes = 6; - optional uint32 following_kv_count = 7; - - /* - This field contains the list of clusters that have - consumed the change - */ - repeated UUID cluster_ids = 8; - - optional uint64 nonceGroup = 9; - optional uint64 nonce = 10; - optional uint64 orig_sequence_number = 11; - -/* - optional CustomEntryType custom_entry_type = 9; - - enum CustomEntryType { - COMPACTION = 0; - } -*/ -} - -enum ScopeType { - REPLICATION_SCOPE_LOCAL = 0; - REPLICATION_SCOPE_GLOBAL = 1; -} - -message FamilyScope { - required bytes family = 1; - required ScopeType scope_type = 2; -} - -/** - * Custom WAL entries - */ - -/** - * Special WAL entry to hold all related to a compaction. - * Written to WAL before completing compaction. There is - * sufficient info in the below message to complete later - * the * compaction should we fail the WAL write. - */ -message CompactionDescriptor { - required bytes table_name = 1; // TODO: WALKey already stores these, might remove - required bytes encoded_region_name = 2; - required bytes family_name = 3; - repeated string compaction_input = 4; // relative to store dir - repeated string compaction_output = 5; - required string store_home_dir = 6; // relative to region dir - optional bytes region_name = 7; // full region name -} - -/** - * Special WAL entry to hold all related to a flush. - */ -message FlushDescriptor { - enum FlushAction { - START_FLUSH = 0; - COMMIT_FLUSH = 1; - ABORT_FLUSH = 2; - CANNOT_FLUSH = 3; // marker for indicating that a flush has been requested but cannot complete - } - - message StoreFlushDescriptor { - required bytes family_name = 1; - required string store_home_dir = 2; //relative to region dir - repeated string flush_output = 3; // relative to store dir (if this is a COMMIT_FLUSH) - } - - required FlushAction action = 1; - required bytes table_name = 2; - required bytes encoded_region_name = 3; - optional uint64 flush_sequence_number = 4; - repeated StoreFlushDescriptor store_flushes = 5; - optional bytes region_name = 6; // full region name -} - -message StoreDescriptor { - required bytes family_name = 1; - required string store_home_dir = 2; //relative to region dir - repeated string store_file = 3; // relative to store dir - optional uint64 store_file_size_bytes = 4; // size of store file -} - -/** - * Special WAL entry used for writing bulk load events to WAL - */ -message BulkLoadDescriptor { - required TableName table_name = 1; - required bytes encoded_region_name = 2; - repeated StoreDescriptor stores = 3; - required int64 bulkload_seq_num = 4; -} - -/** - * Special WAL entry to hold all related to a region event (open/close). - */ -message RegionEventDescriptor { - enum EventType { - REGION_OPEN = 0; - REGION_CLOSE = 1; - } - - required EventType event_type = 1; - required bytes table_name = 2; - required bytes encoded_region_name = 3; - optional uint64 log_sequence_number = 4; - repeated StoreDescriptor stores = 5; - optional ServerName server = 6; // Server who opened the region - optional bytes region_name = 7; // full region name -} - -/** - * A trailer that is appended to the end of a properly closed WAL file. - * If missing, this is either a legacy or a corrupted WAL file. - * N.B. This trailer currently doesn't contain any information and we - * purposefully don't expose it in the WAL APIs. It's for future growth. - */ -message WALTrailer { -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/if/ZooKeeper.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/if/ZooKeeper.proto b/hbase-native-client/if/ZooKeeper.proto deleted file mode 100644 index 41c0e0e..0000000 --- a/hbase-native-client/if/ZooKeeper.proto +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// ZNode data in hbase are serialized protobufs with a four byte -// 'magic' 'PBUF' prefix. -package hbase.pb; - -option java_package = "org.apache.hadoop.hbase.protobuf.generated"; -option java_outer_classname = "ZooKeeperProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; -option optimize_for = SPEED; - -import "HBase.proto"; -import "ClusterStatus.proto"; - -/** - * Content of the meta-region-server znode. - */ -message MetaRegionServer { - // The ServerName hosting the meta region currently, or destination server, - // if meta region is in transition. - required ServerName server = 1; - // The major version of the rpc the server speaks. This is used so that - // clients connecting to the cluster can have prior knowledge of what version - // to send to a RegionServer. AsyncHBase will use this to detect versions. - optional uint32 rpc_version = 2; - - // State of the region transition. OPEN means fully operational 'hbase:meta' - optional RegionState.State state = 3; -} - -/** - * Content of the master znode. - */ -message Master { - // The ServerName of the current Master - required ServerName master = 1; - // Major RPC version so that clients can know what version the master can accept. - optional uint32 rpc_version = 2; - optional uint32 info_port = 3; -} - -/** - * Content of the '/hbase/running', cluster state, znode. - */ -message ClusterUp { - // If this znode is present, cluster is up. Currently - // the data is cluster start_date. - required string start_date = 1; -} - -/** - * WAL SplitLog directory znodes have this for content. Used doing distributed - * WAL splitting. Holds current state and name of server that originated split. - */ -message SplitLogTask { - enum State { - UNASSIGNED = 0; - OWNED = 1; - RESIGNED = 2; - DONE = 3; - ERR = 4; - } - enum RecoveryMode { - UNKNOWN = 0; - LOG_SPLITTING = 1; - LOG_REPLAY = 2; - } - required State state = 1; - required ServerName server_name = 2; - optional RecoveryMode mode = 3 [default = UNKNOWN]; -} - -/** - * The znode that holds state of table. - * Deprected, table state is stored in table descriptor on HDFS. - */ -message DeprecatedTableState { - // Table's current state - enum State { - ENABLED = 0; - DISABLED = 1; - DISABLING = 2; - ENABLING = 3; - } - // This is the table's state. If no znode for a table, - // its state is presumed enabled. See o.a.h.h.zookeeper.ZKTable class - // for more. - required State state = 1 [default = ENABLED]; -} - -message TableCF { - optional TableName table_name = 1; - repeated bytes families = 2; -} - -/** - * Used by replication. Holds a replication peer key. - */ -message ReplicationPeer { - // clusterkey is the concatenation of the slave cluster's - // hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent - required string clusterkey = 1; - optional string replicationEndpointImpl = 2; - repeated BytesBytesPair data = 3; - repeated NameStringPair configuration = 4; - repeated TableCF table_cfs = 5; -} - -/** - * Used by replication. Holds whether enabled or disabled - */ -message ReplicationState { - enum State { - ENABLED = 0; - DISABLED = 1; - } - required State state = 1; -} - -/** - * Used by replication. Holds the current position in an WAL file. - */ -message ReplicationHLogPosition { - required int64 position = 1; -} - -/** - * Used by replication. Used to lock a region server during failover. - */ -message ReplicationLock { - required string lock_owner = 1; -} - -/** - * Metadata associated with a table lock in zookeeper - */ -message TableLock { - optional TableName table_name = 1; - optional ServerName lock_owner = 2; - optional int64 thread_id = 3; - optional bool is_shared = 4; - optional string purpose = 5; - optional int64 create_time = 6; -} - -/** - * State of the switch. - */ -message SwitchState { - optional bool enabled = 1; -} - -/** - * State for split and merge, used in hbck - */ -message SplitAndMergeState { - optional bool split_enabled = 1; - optional bool merge_enabled = 2; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/if/test.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/if/test.proto b/hbase-native-client/if/test.proto deleted file mode 100644 index 72b68e9..0000000 --- a/hbase-native-client/if/test.proto +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated"; -option java_outer_classname = "TestProtos"; -option java_generate_equals_and_hash = true; - -message EmptyRequestProto { -} - -message EmptyResponseProto { -} - -message EchoRequestProto { - required string message = 1; -} - -message EchoResponseProto { - required string message = 1; -} - -message PauseRequestProto { - required uint32 ms = 1; -} - -message AddrResponseProto { - required string addr = 1; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/if/test_rpc_service.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto deleted file mode 100644 index 2730403..0000000 --- a/hbase-native-client/if/test_rpc_service.proto +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated"; -option java_outer_classname = "TestRpcServiceProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; - -import "test.proto"; - - -/** - * A protobuf service for use in tests - */ -service TestProtobufRpcProto { - rpc ping(EmptyRequestProto) returns (EmptyResponseProto); - rpc echo(EchoRequestProto) returns (EchoResponseProto); - rpc error(EmptyRequestProto) returns (EmptyResponseProto); - rpc pause(PauseRequestProto) returns (EmptyResponseProto); - rpc addr(EmptyRequestProto) returns (AddrResponseProto); - rpc socketNotOpen(EmptyRequestProto) returns (EmptyResponseProto); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/BUCK b/hbase-native-client/include/hbase/client/BUCK new file mode 100644 index 0000000..66d6896 --- /dev/null +++ b/hbase-native-client/include/hbase/client/BUCK @@ -0,0 +1,93 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is the main library. +cxx_library( + name="client", + header_namespace="hbase/client", + exported_headers=[ + "async-client-scanner.h", + "async-connection.h", + "async-region-locator.h", + "async-rpc-retrying-caller-factory.h", + "async-rpc-retrying-caller.h", + "async-table-result-scanner.h", + "client.h", + "cell.h", + "filter.h", + "query.h", + "keyvalue-codec.h", + "region-location.h", + "location-cache.h", + "connection-configuration.h", + # TODO: move this out of exported + # Once meta lookup works + "meta-utils.h", + "get.h", + "increment.h", + "mutation.h", + "put.h", + "delete.h", + "scan.h", + "append.h", + "result.h", + "result-scanner.h", + "request-converter.h", + "response-converter.h", + "table.h", + "async-scan-rpc-retrying-caller.h", + "raw-async-table.h", + "raw-scan-result-consumer.h", + "scan-result-cache.h", + "hbase-rpc-controller.h", + "time-range.h", + "zk-util.h", + "action.h", + "multi-response.h", + "region-request.h", + "region-result.h", + "row.h", + "server-request.h", + "async-batch-rpc-retrying-caller.h", + ], + deps=[ + "//include/hbase/exceptions:exceptions", + "//include/hbase/utils:utils", + "//include/hbase/connection:connection", + "//include/hbase/client:conf", + "//src/hbase/if:if", + "//include/hbase/serde:serde", + "//third-party:folly", + "//third-party:wangle", + "//third-party:zookeeper_mt", + ], + compiler_flags=['-Weffc++', '-ggdb'], + visibility=[ + 'PUBLIC', + ],) +cxx_library( + name="conf", + header_namespace="hbase/client", + exported_headers=[ + "configuration.h", + "hbase-configuration-loader.h", + ], + deps=["//src/hbase/utils:utils", "//third-party:folly"], + compiler_flags=['-Weffc++', '-ggdb'], + visibility=[ + 'PUBLIC', + ],) http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/action.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/action.h b/hbase-native-client/include/hbase/client/action.h new file mode 100644 index 0000000..2288f12 --- /dev/null +++ b/hbase-native-client/include/hbase/client/action.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <memory> +#include "hbase/client/row.h" + +namespace hbase { +class Action { + public: + Action(std::shared_ptr<hbase::Row> action, int32_t original_index) + : action_(action), original_index_(original_index) {} + ~Action() {} + + int32_t original_index() const { return original_index_; } + + std::shared_ptr<hbase::Row> action() const { return action_; } + + private: + std::shared_ptr<hbase::Row> action_; + int32_t original_index_; + int64_t nonce_ = -1; + int32_t replica_id_ = -1; +}; + +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/append.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/append.h b/hbase-native-client/include/hbase/client/append.h new file mode 100644 index 0000000..e7f9a6d --- /dev/null +++ b/hbase-native-client/include/hbase/client/append.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <vector> +#include "hbase/client/cell.h" +#include "hbase/client/mutation.h" + +namespace hbase { + +class Append : public Mutation { + public: + /** + * Constructors + */ + explicit Append(const std::string& row) : Mutation(row) {} + Append(const Append& cappend) : Mutation(cappend) {} + Append& operator=(const Append& cappend) { + Mutation::operator=(cappend); + return *this; + } + + ~Append() = default; + + /** + * @brief Add the specified column and value to this Append operation. + * @param family family name + * @param qualifier column qualifier + * @param value value to append + */ + Append& Add(const std::string& family, const std::string& qualifier, const std::string& value); + Append& Add(std::unique_ptr<Cell> cell); +}; + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/async-batch-rpc-retrying-caller.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/async-batch-rpc-retrying-caller.h b/hbase-native-client/include/hbase/client/async-batch-rpc-retrying-caller.h new file mode 100644 index 0000000..39b7aa9 --- /dev/null +++ b/hbase-native-client/include/hbase/client/async-batch-rpc-retrying-caller.h @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <folly/ExceptionWrapper.h> +#include <folly/Format.h> +#include <folly/Try.h> +#include <folly/futures/Future.h> +#include <folly/futures/Promise.h> +#include <folly/io/IOBuf.h> +#include <folly/io/async/HHWheelTimer.h> +#include <wangle/concurrent/CPUThreadPoolExecutor.h> + +#include <algorithm> +#include <chrono> +#include <functional> +#include <map> +#include <memory> +#include <mutex> +#include <stdexcept> +#include <string> +#include <tuple> +#include <type_traits> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "hbase/connection/rpc-client.h" +#include "hbase/client/action.h" +#include "hbase/client/async-connection.h" +#include "hbase/client/location-cache.h" +#include "hbase/client/multi-response.h" +#include "hbase/client/region-location.h" +#include "hbase/client/region-request.h" +#include "hbase/client/region-result.h" +#include "hbase/client/request-converter.h" +#include "hbase/client/response-converter.h" +#include "hbase/client/result.h" +#include "hbase/client/row.h" +#include "hbase/client/server-request.h" +#include "hbase/exceptions/exception.h" +#include "hbase/if/Client.pb.h" +#include "hbase/if/HBase.pb.h" +#include "hbase/security/user.h" +#include "hbase/utils/connection-util.h" +#include "hbase/utils/sys-util.h" +#include "hbase/utils/time-util.h" + +namespace hbase { +/* Equals function for ServerName */ +struct ServerNameEquals { + bool operator()(const std::shared_ptr<pb::ServerName> &lhs, + const std::shared_ptr<pb::ServerName> &rhs) const { + return (lhs->start_code() == rhs->start_code() && lhs->host_name() == rhs->host_name() && + lhs->port() == rhs->port()); + } +}; + +struct ServerNameHash { + /** hash */ + std::size_t operator()(const std::shared_ptr<pb::ServerName> &sn) const { + std::size_t h = 0; + boost::hash_combine(h, sn->start_code()); + boost::hash_combine(h, sn->host_name()); + boost::hash_combine(h, sn->port()); + return h; + } +}; + +template <typename REQ, typename RESP> +class AsyncBatchRpcRetryingCaller { + public: + using ActionsByServer = + std::unordered_map<std::shared_ptr<pb::ServerName>, std::shared_ptr<ServerRequest>, + ServerNameHash, ServerNameEquals>; + using ActionsByRegion = ServerRequest::ActionsByRegion; + + AsyncBatchRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn, + std::shared_ptr<folly::HHWheelTimer> retry_timer, + std::shared_ptr<pb::TableName> table_name, + const std::vector<REQ> &actions, std::chrono::nanoseconds pause_ns, + int32_t max_attempts, std::chrono::nanoseconds operation_timeout_ns, + std::chrono::nanoseconds rpc_timeout_ns, + int32_t start_log_errors_count); + + ~AsyncBatchRpcRetryingCaller(); + + folly::Future<std::vector<folly::Try<RESP>>> Call(); + + private: + int64_t RemainingTimeNs(); + + void LogException(int32_t tries, std::shared_ptr<RegionRequest> region_request, + const folly::exception_wrapper &ew, + std::shared_ptr<pb::ServerName> server_name); + + void LogException(int32_t tries, + const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests, + const folly::exception_wrapper &ew, + std::shared_ptr<pb::ServerName> server_name); + + const std::string GetExtraContextForError(std::shared_ptr<pb::ServerName> server_name); + + void AddError(const std::shared_ptr<Action> &action, const folly::exception_wrapper &ew, + std::shared_ptr<pb::ServerName> server_name); + + void AddError(const std::vector<std::shared_ptr<Action>> &actions, + const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); + + void FailOne(const std::shared_ptr<Action> &action, int32_t tries, + const folly::exception_wrapper &ew, int64_t current_time, const std::string extras); + + void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries, + const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); + + void FailAll(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries); + + void AddAction2Error(uint64_t action_index, const ThrowableWithExtraContext &twec); + + void OnError(const ActionsByRegion &actions_by_region, int32_t tries, + const folly::exception_wrapper &ew, std::shared_ptr<pb::ServerName> server_name); + + void TryResubmit(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries); + + folly::Future<std::vector<folly::Try<std::shared_ptr<RegionLocation>>>> GetRegionLocations( + const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns); + + void GroupAndSend(const std::vector<std::shared_ptr<Action>> &actions, int32_t tries); + + folly::Future<std::vector<folly::Try<std::unique_ptr<Response>>>> GetMultiResponse( + const ActionsByServer &actions_by_server); + + void Send(const ActionsByServer &actions_by_server, int32_t tries); + + void OnComplete(const ActionsByRegion &actions_by_region, int32_t tries, + const std::shared_ptr<pb::ServerName> server_name, + const std::unique_ptr<MultiResponse> multi_results); + + void OnComplete(const std::shared_ptr<Action> &action, + const std::shared_ptr<RegionRequest> ®ion_request, int32_t tries, + const std::shared_ptr<pb::ServerName> &server_name, + const std::shared_ptr<RegionResult> ®ion_result, + std::vector<std::shared_ptr<Action>> &failed_actions); + + private: + std::shared_ptr<folly::HHWheelTimer> retry_timer_; + std::shared_ptr<hbase::AsyncConnection> conn_; + std::shared_ptr<pb::TableName> table_name_; + std::vector<std::shared_ptr<Action>> actions_; + std::chrono::nanoseconds pause_ns_; + int32_t max_attempts_ = 0; + std::chrono::nanoseconds operation_timeout_ns_; + std::chrono::nanoseconds rpc_timeout_ns_; + int32_t start_log_errors_count_ = 0; + + int64_t start_ns_ = TimeUtil::GetNowNanos(); + int32_t tries_ = 1; + std::map<uint64_t, folly::Promise<RESP>> action2promises_; + std::vector<folly::Future<RESP>> action2futures_; + std::map<uint64_t, std::shared_ptr<std::vector<ThrowableWithExtraContext>>> action2errors_; + + std::shared_ptr<AsyncRegionLocator> location_cache_ = nullptr; + std::shared_ptr<RpcClient> rpc_client_ = nullptr; + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_pool_ = nullptr; + + std::recursive_mutex multi_mutex_; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/async-client-scanner.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/async-client-scanner.h b/hbase-native-client/include/hbase/client/async-client-scanner.h new file mode 100644 index 0000000..cccf50b --- /dev/null +++ b/hbase-native-client/include/hbase/client/async-client-scanner.h @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/Format.h> +#include <folly/Logging.h> +#include <folly/futures/Future.h> +#include <folly/io/async/EventBase.h> +#include <folly/io/async/HHWheelTimer.h> + +#include <algorithm> +#include <chrono> +#include <functional> +#include <memory> +#include <string> +#include <type_traits> +#include <utility> +#include <vector> + +#include "hbase/connection/rpc-client.h" +#include "hbase/client/async-connection.h" +#include "hbase/client/async-rpc-retrying-caller-factory.h" +#include "hbase/client/async-rpc-retrying-caller.h" +#include "hbase/client/hbase-rpc-controller.h" +#include "hbase/client/raw-scan-result-consumer.h" +#include "hbase/client/region-location.h" +#include "hbase/client/request-converter.h" +#include "hbase/client/response-converter.h" +#include "hbase/client/result.h" +#include "hbase/client/scan-result-cache.h" +#include "hbase/client/scan.h" +#include "hbase/exceptions/exception.h" +#include "hbase/if/Client.pb.h" +#include "hbase/if/HBase.pb.h" +#include "hbase/utils/connection-util.h" +#include "hbase/utils/sys-util.h" +#include "hbase/utils/time-util.h" + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { +class OpenScannerResponse { + public: + OpenScannerResponse(std::shared_ptr<hbase::RpcClient> rpc_client, + const std::unique_ptr<Response>& resp, + std::shared_ptr<RegionLocation> region_location, + std::shared_ptr<hbase::HBaseRpcController> controller) + : rpc_client_(rpc_client), region_location_(region_location), controller_(controller) { + scan_resp_ = std::static_pointer_cast<pb::ScanResponse>(resp->resp_msg()); + cell_scanner_ = resp->cell_scanner(); + } + std::shared_ptr<hbase::RpcClient> rpc_client_; + std::shared_ptr<pb::ScanResponse> scan_resp_; + std::shared_ptr<RegionLocation> region_location_; + std::shared_ptr<hbase::HBaseRpcController> controller_; + std::shared_ptr<CellScanner> cell_scanner_; +}; + +class AsyncClientScanner : public std::enable_shared_from_this<AsyncClientScanner> { + public: + template <typename... T> + static std::shared_ptr<AsyncClientScanner> Create(T&&... all) { + return std::shared_ptr<AsyncClientScanner>(new AsyncClientScanner(std::forward<T>(all)...)); + } + + void Start(); + + private: + // methods + AsyncClientScanner(std::shared_ptr<AsyncConnection> conn, std::shared_ptr<Scan> scan, + std::shared_ptr<pb::TableName> table_name, + std::shared_ptr<RawScanResultConsumer> consumer, nanoseconds pause, + uint32_t max_retries, nanoseconds scan_timeout_nanos, + nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count); + + folly::Future<std::shared_ptr<OpenScannerResponse>> CallOpenScanner( + std::shared_ptr<hbase::RpcClient> rpc_client, + std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc); + + void OpenScanner(); + + void StartScan(std::shared_ptr<OpenScannerResponse> resp); + + RegionLocateType GetLocateType(const Scan& scan); + + private: + // data + std::shared_ptr<AsyncConnection> conn_; + std::shared_ptr<Scan> scan_; + std::shared_ptr<pb::TableName> table_name_; + std::shared_ptr<ScanResultCache> results_cache_; + std::shared_ptr<RawScanResultConsumer> consumer_; + nanoseconds pause_; + uint32_t max_retries_; + nanoseconds scan_timeout_nanos_; + nanoseconds rpc_timeout_nanos_; + uint32_t start_log_errors_count_; + uint32_t max_attempts_; + uint32_t open_scanner_tries_ = 0; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/async-connection.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/async-connection.h b/hbase-native-client/include/hbase/client/async-connection.h new file mode 100644 index 0000000..547ac76 --- /dev/null +++ b/hbase-native-client/include/hbase/client/async-connection.h @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <folly/futures/Future.h> +#include <folly/io/IOBuf.h> +#include <wangle/concurrent/CPUThreadPoolExecutor.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> + +#include <memory> +#include <string> +#include <utility> + +#include "hbase/connection/rpc-client.h" +#include "hbase/client/async-region-locator.h" +#include "hbase/client/configuration.h" +#include "hbase/client/connection-configuration.h" +#include "hbase/client/hbase-configuration-loader.h" +#include "hbase/client/hbase-rpc-controller.h" +#include "hbase/client/keyvalue-codec.h" +#include "hbase/client/location-cache.h" +#include "hbase/if/Cell.pb.h" +#include "hbase/serde/table-name.h" + +namespace hbase { + +class AsyncRpcRetryingCallerFactory; + +class AsyncConnection { + public: + AsyncConnection() {} + virtual ~AsyncConnection() {} + virtual std::shared_ptr<Configuration> conf() = 0; + virtual std::shared_ptr<ConnectionConfiguration> connection_conf() = 0; + virtual std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() = 0; + virtual std::shared_ptr<RpcClient> rpc_client() = 0; + virtual std::shared_ptr<AsyncRegionLocator> region_locator() = 0; + virtual std::shared_ptr<HBaseRpcController> CreateRpcController() = 0; + virtual std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() = 0; + virtual std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() = 0; + virtual std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() = 0; + virtual void Close() = 0; +}; + +class AsyncConnectionImpl : public AsyncConnection, + public std::enable_shared_from_this<AsyncConnectionImpl> { + public: + virtual ~AsyncConnectionImpl(); + + // See https://mortoray.com/2013/08/02/safely-using-enable_shared_from_this/ + template <typename... T> + static std::shared_ptr<AsyncConnectionImpl> Create(T&&... all) { + auto conn = + std::shared_ptr<AsyncConnectionImpl>(new AsyncConnectionImpl(std::forward<T>(all)...)); + conn->Init(); + return conn; + } + + std::shared_ptr<Configuration> conf() override { return conf_; } + std::shared_ptr<ConnectionConfiguration> connection_conf() override { return connection_conf_; } + std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory() override { + return caller_factory_; + } + std::shared_ptr<RpcClient> rpc_client() override { return rpc_client_; } + std::shared_ptr<LocationCache> location_cache() { return location_cache_; } + std::shared_ptr<AsyncRegionLocator> region_locator() override { return location_cache_; } + std::shared_ptr<HBaseRpcController> CreateRpcController() override { + return std::make_shared<HBaseRpcController>(); + } + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor() override { return cpu_executor_; } + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor() override { return io_executor_; } + std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor() override { + return retry_executor_; + } + + void Close() override; + + protected: + AsyncConnectionImpl() {} + + private: + /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */ + static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size"; + /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */ + static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size"; + /** The RPC codec to encode cells. For now it is KeyValueCodec */ + static constexpr const char* kRpcCodec = "hbase.client.rpc.codec"; + + std::shared_ptr<Configuration> conf_; + std::shared_ptr<ConnectionConfiguration> connection_conf_; + std::shared_ptr<AsyncRpcRetryingCallerFactory> caller_factory_; + std::shared_ptr<folly::HHWheelTimer> retry_timer_; + std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_; + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; + std::shared_ptr<wangle::IOThreadPoolExecutor> retry_executor_; + std::shared_ptr<LocationCache> location_cache_; + std::shared_ptr<RpcClient> rpc_client_; + bool is_closed_ = false; + + private: + explicit AsyncConnectionImpl(std::shared_ptr<Configuration> conf) : conf_(conf) {} + void Init(); +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/async-region-locator.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/async-region-locator.h b/hbase-native-client/include/hbase/client/async-region-locator.h new file mode 100644 index 0000000..d42038e --- /dev/null +++ b/hbase-native-client/include/hbase/client/async-region-locator.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <folly/ExceptionWrapper.h> +#include <folly/futures/Future.h> +#include <memory> +#include <string> + +#include "hbase/client/region-location.h" +#include "hbase/if/Client.pb.h" +#include "hbase/serde/region-info.h" +#include "hbase/serde/server-name.h" +#include "hbase/serde/table-name.h" + +namespace hbase { + +class AsyncRegionLocator { + public: + AsyncRegionLocator() {} + virtual ~AsyncRegionLocator() = default; + + /** + * The only method clients should use for meta lookups. If corresponding + * location is cached, it's returned from the cache, otherwise lookup + * in meta table is done, location is cached and then returned. + * It's expected that tiny fraction of invocations incurs meta scan. + * This method is to look up non-meta regions; use LocateMeta() to get the + * location of hbase:meta region. + * + * @param tn Table name of the table to look up. This object must live until + * after the future is returned + * + * @param row of the table to look up. This object must live until after the + * future is returned + */ + virtual folly::Future<std::shared_ptr<RegionLocation>> LocateRegion( + const hbase::pb::TableName &tn, const std::string &row, + const RegionLocateType locate_type = RegionLocateType::kCurrent, + const int64_t locate_ns = 0) = 0; + /** + * Update cached region location, possibly using the information from exception. + */ + virtual void UpdateCachedLocation(const RegionLocation &loc, + const folly::exception_wrapper &error) = 0; +}; + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/async-rpc-retrying-caller-factory.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/async-rpc-retrying-caller-factory.h b/hbase-native-client/include/hbase/client/async-rpc-retrying-caller-factory.h new file mode 100644 index 0000000..a580896 --- /dev/null +++ b/hbase-native-client/include/hbase/client/async-rpc-retrying-caller-factory.h @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/Logging.h> +#include <folly/io/async/EventBase.h> +#include <chrono> +#include <memory> +#include <string> +#include <vector> + +#include "hbase/connection/rpc-client.h" +#include "hbase/client/async-batch-rpc-retrying-caller.h" +#include "hbase/client/async-rpc-retrying-caller.h" +#include "hbase/client/async-scan-rpc-retrying-caller.h" +#include "hbase/client/raw-scan-result-consumer.h" +#include "hbase/client/region-location.h" +#include "hbase/client/row.h" +#include "hbase/client/scan-result-cache.h" +#include "hbase/client/scan.h" + +#include "hbase/if/Client.pb.h" +#include "hbase/if/HBase.pb.h" + +namespace hbase { + +class AsyncConnection; + +template <typename RESP> +class SingleRequestCallerBuilder + : public std::enable_shared_from_this<SingleRequestCallerBuilder<RESP>> { + public: + explicit SingleRequestCallerBuilder(std::shared_ptr<AsyncConnection> conn, + std::shared_ptr<folly::HHWheelTimer> retry_timer) + : conn_(conn), + retry_timer_(retry_timer), + table_name_(nullptr), + rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()), + pause_(conn->connection_conf()->pause()), + operation_timeout_nanos_(conn->connection_conf()->operation_timeout()), + max_retries_(conn->connection_conf()->max_retries()), + start_log_errors_count_(conn->connection_conf()->start_log_errors_count()), + locate_type_(RegionLocateType::kCurrent) {} + + virtual ~SingleRequestCallerBuilder() = default; + + typedef SingleRequestCallerBuilder<RESP> GenericThisType; + typedef std::shared_ptr<GenericThisType> SharedThisPtr; + + SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) { + table_name_ = table_name; + return shared_this(); + } + + SharedThisPtr rpc_timeout(std::chrono::nanoseconds rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr operation_timeout(std::chrono::nanoseconds operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return shared_this(); + } + + SharedThisPtr pause(std::chrono::nanoseconds pause) { + pause_ = pause; + return shared_this(); + } + + SharedThisPtr max_retries(uint32_t max_retries) { + max_retries_ = max_retries; + return shared_this(); + } + + SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) { + start_log_errors_count_ = start_log_errors_count; + return shared_this(); + } + + SharedThisPtr row(const std::string& row) { + row_ = row; + return shared_this(); + } + + SharedThisPtr locate_type(RegionLocateType locate_type) { + locate_type_ = locate_type; + return shared_this(); + } + + SharedThisPtr action(Callable<RESP> callable) { + callable_ = callable; + return shared_this(); + } + + folly::Future<RESP> Call() { return Build()->Call(); } + + std::shared_ptr<AsyncSingleRequestRpcRetryingCaller<RESP>> Build() { + return std::make_shared<AsyncSingleRequestRpcRetryingCaller<RESP>>( + conn_, retry_timer_, table_name_, row_, locate_type_, callable_, pause_, max_retries_, + operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this<GenericThisType>::shared_from_this(); + } + + private: + std::shared_ptr<AsyncConnection> conn_; + std::shared_ptr<folly::HHWheelTimer> retry_timer_; + std::shared_ptr<pb::TableName> table_name_; + std::chrono::nanoseconds rpc_timeout_nanos_; + std::chrono::nanoseconds operation_timeout_nanos_; + std::chrono::nanoseconds pause_; + uint32_t max_retries_; + uint32_t start_log_errors_count_; + std::string row_; + RegionLocateType locate_type_; + Callable<RESP> callable_; +}; // end of SingleRequestCallerBuilder + +template <typename REQ, typename RESP> +class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder<REQ, RESP>> { + public: + explicit BatchCallerBuilder(std::shared_ptr<AsyncConnection> conn, + std::shared_ptr<folly::HHWheelTimer> retry_timer) + : conn_(conn), retry_timer_(retry_timer) {} + + virtual ~BatchCallerBuilder() = default; + + typedef std::shared_ptr<BatchCallerBuilder<REQ, RESP>> SharedThisPtr; + + SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) { + table_name_ = table_name; + return shared_this(); + } + + SharedThisPtr actions(std::shared_ptr<std::vector<REQ>> actions) { + actions_ = actions; + return shared_this(); + } + + SharedThisPtr operation_timeout(std::chrono::nanoseconds operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return shared_this(); + } + + SharedThisPtr rpc_timeout(std::chrono::nanoseconds rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr pause(std::chrono::nanoseconds pause_ns) { + pause_ns_ = pause_ns; + return shared_this(); + } + + SharedThisPtr max_attempts(int32_t max_attempts) { + max_attempts_ = max_attempts; + return shared_this(); + } + + SharedThisPtr start_log_errors_count(int32_t start_log_errors_count) { + start_log_errors_count_ = start_log_errors_count; + return shared_this(); + } + + folly::Future<std::vector<folly::Try<RESP>>> Call() { return Build()->Call(); } + + std::shared_ptr<AsyncBatchRpcRetryingCaller<REQ, RESP>> Build() { + return std::make_shared<AsyncBatchRpcRetryingCaller<REQ, RESP>>( + conn_, retry_timer_, table_name_, *actions_, pause_ns_, max_attempts_, + operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this<BatchCallerBuilder>::shared_from_this(); + } + + private: + std::shared_ptr<AsyncConnection> conn_; + std::shared_ptr<folly::HHWheelTimer> retry_timer_; + std::shared_ptr<hbase::pb::TableName> table_name_ = nullptr; + std::shared_ptr<std::vector<REQ>> actions_ = nullptr; + std::chrono::nanoseconds pause_ns_; + int32_t max_attempts_ = 0; + std::chrono::nanoseconds operation_timeout_nanos_; + std::chrono::nanoseconds rpc_timeout_nanos_; + int32_t start_log_errors_count_ = 0; +}; + +class ScanCallerBuilder : public std::enable_shared_from_this<ScanCallerBuilder> { + public: + explicit ScanCallerBuilder(std::shared_ptr<AsyncConnection> conn, + std::shared_ptr<folly::HHWheelTimer> retry_timer) + : conn_(conn), + retry_timer_(retry_timer), + rpc_timeout_nanos_(conn->connection_conf()->rpc_timeout()), + pause_(conn->connection_conf()->pause()), + scan_timeout_nanos_(conn->connection_conf()->scan_timeout()), + max_retries_(conn->connection_conf()->max_retries()), + start_log_errors_count_(conn->connection_conf()->start_log_errors_count()), + scanner_id_(-1) {} + + virtual ~ScanCallerBuilder() = default; + + typedef ScanCallerBuilder GenericThisType; + typedef std::shared_ptr<ScanCallerBuilder> SharedThisPtr; + + SharedThisPtr rpc_client(std::shared_ptr<hbase::RpcClient> rpc_client) { + rpc_client_ = rpc_client; + return shared_this(); + } + + SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr scan_timeout(nanoseconds scan_timeout_nanos) { + scan_timeout_nanos_ = scan_timeout_nanos; + return shared_this(); + } + + SharedThisPtr scanner_lease_timeout(nanoseconds scanner_lease_timeout_nanos) { + scanner_lease_timeout_nanos_ = scanner_lease_timeout_nanos; + return shared_this(); + } + + SharedThisPtr pause(nanoseconds pause) { + pause_ = pause; + return shared_this(); + } + + SharedThisPtr max_retries(uint32_t max_retries) { + max_retries_ = max_retries; + return shared_this(); + } + + SharedThisPtr start_log_errors_count(uint32_t start_log_errors_count) { + start_log_errors_count_ = start_log_errors_count; + return shared_this(); + } + + SharedThisPtr region_location(std::shared_ptr<RegionLocation> region_location) { + region_location_ = region_location; + return shared_this(); + } + + SharedThisPtr scanner_id(int64_t scanner_id) { + scanner_id_ = scanner_id; + return shared_this(); + } + + SharedThisPtr scan(std::shared_ptr<Scan> scan) { + scan_ = scan; + return shared_this(); + } + + SharedThisPtr results_cache(std::shared_ptr<ScanResultCache> results_cache) { + results_cache_ = results_cache; + return shared_this(); + } + + SharedThisPtr consumer(std::shared_ptr<RawScanResultConsumer> consumer) { + consumer_ = consumer; + return shared_this(); + } + + std::shared_ptr<AsyncScanRpcRetryingCaller> Build() { + return std::make_shared<AsyncScanRpcRetryingCaller>( + conn_, retry_timer_, rpc_client_, scan_, scanner_id_, results_cache_, consumer_, + region_location_, scanner_lease_timeout_nanos_, pause_, max_retries_, scan_timeout_nanos_, + rpc_timeout_nanos_, start_log_errors_count_); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this<GenericThisType>::shared_from_this(); + } + + private: + std::shared_ptr<AsyncConnection> conn_; + std::shared_ptr<folly::HHWheelTimer> retry_timer_; + std::shared_ptr<hbase::RpcClient> rpc_client_; + std::shared_ptr<Scan> scan_; + nanoseconds rpc_timeout_nanos_; + nanoseconds scan_timeout_nanos_; + nanoseconds scanner_lease_timeout_nanos_; + nanoseconds pause_; + uint32_t max_retries_; + uint32_t start_log_errors_count_; + std::shared_ptr<RegionLocation> region_location_; + int64_t scanner_id_; + std::shared_ptr<RawScanResultConsumer> consumer_; + std::shared_ptr<ScanResultCache> results_cache_; +}; // end of ScanCallerBuilder + +class AsyncRpcRetryingCallerFactory { + private: + std::shared_ptr<AsyncConnection> conn_; + std::shared_ptr<folly::HHWheelTimer> retry_timer_; + + public: + explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<AsyncConnection> conn, + std::shared_ptr<folly::HHWheelTimer> retry_timer) + : conn_(conn), retry_timer_(retry_timer) {} + + virtual ~AsyncRpcRetryingCallerFactory() = default; + + template <typename RESP> + std::shared_ptr<SingleRequestCallerBuilder<RESP>> Single() { + return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_, retry_timer_); + } + + template <typename REQ, typename RESP> + std::shared_ptr<BatchCallerBuilder<REQ, RESP>> Batch() { + return std::make_shared<BatchCallerBuilder<REQ, RESP>>(conn_, retry_timer_); + } + + std::shared_ptr<ScanCallerBuilder> Scan() { + return std::make_shared<ScanCallerBuilder>(conn_, retry_timer_); + } +}; + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/async-rpc-retrying-caller.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/async-rpc-retrying-caller.h b/hbase-native-client/include/hbase/client/async-rpc-retrying-caller.h new file mode 100644 index 0000000..b5f4ad2 --- /dev/null +++ b/hbase-native-client/include/hbase/client/async-rpc-retrying-caller.h @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/ExceptionWrapper.h> +#include <folly/futures/Future.h> +#include <folly/io/async/EventBase.h> +#include <folly/io/async/HHWheelTimer.h> + +#include <algorithm> +#include <chrono> +#include <functional> +#include <memory> +#include <string> +#include <type_traits> +#include <utility> +#include <vector> +#include "hbase/client/async-connection.h" +#include "hbase/client/hbase-rpc-controller.h" +#include "hbase/client/region-location.h" +#include "hbase/exceptions/exception.h" +#include "hbase/if/HBase.pb.h" + +namespace hbase { + +template <typename T> +using Supplier = std::function<T()>; + +template <typename T> +using Consumer = std::function<void(T)>; + +template <typename R, typename S, typename... I> +using ReqConverter = std::function<R(const S&, const I&...)>; + +template <typename R, typename S> +using RespConverter = std::function<R(const S&)>; + +template <typename RESP> +using RpcCallback = std::function<void(const RESP&)>; + +template <typename REQ, typename RESP> +using RpcCall = std::function<folly::Future<std::unique_ptr<RESP>>( + std::shared_ptr<RpcClient>, std::shared_ptr<RegionLocation>, + std::shared_ptr<HBaseRpcController>, std::unique_ptr<REQ>)>; + +template <typename RESP> +using Callable = + std::function<folly::Future<RESP>(std::shared_ptr<HBaseRpcController>, + std::shared_ptr<RegionLocation>, std::shared_ptr<RpcClient>)>; + +template <typename RESP> +class AsyncSingleRequestRpcRetryingCaller { + public: + AsyncSingleRequestRpcRetryingCaller( + std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer, + std::shared_ptr<hbase::pb::TableName> table_name, const std::string& row, + RegionLocateType locate_type, Callable<RESP> callable, std::chrono::nanoseconds pause, + uint32_t max_retries, std::chrono::nanoseconds operation_timeout_nanos, + std::chrono::nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count); + + virtual ~AsyncSingleRequestRpcRetryingCaller(); + + folly::Future<RESP> Call(); + + private: + void LocateThenCall(); + + void OnError(const folly::exception_wrapper& error, Supplier<std::string> err_msg, + Consumer<folly::exception_wrapper> update_cached_location); + + void Call(const RegionLocation& loc); + + void CompleteExceptionally(); + + int64_t RemainingTimeNs(); + + static void ResetController(std::shared_ptr<HBaseRpcController> controller, + const int64_t& timeout_ns); + + private: + std::shared_ptr<AsyncConnection> conn_; + std::shared_ptr<folly::HHWheelTimer> retry_timer_; + std::shared_ptr<hbase::pb::TableName> table_name_; + std::string row_; + RegionLocateType locate_type_; + Callable<RESP> callable_; + std::chrono::nanoseconds pause_; + uint32_t max_retries_; + std::chrono::nanoseconds operation_timeout_nanos_; + std::chrono::nanoseconds rpc_timeout_nanos_; + uint32_t start_log_errors_count_; + std::shared_ptr<folly::Promise<RESP>> promise_; + std::shared_ptr<HBaseRpcController> controller_; + uint64_t start_ns_; + uint32_t tries_; + std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_; + uint32_t max_attempts_; +}; +} /* namespace hbase */ http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/async-scan-rpc-retrying-caller.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/async-scan-rpc-retrying-caller.h b/hbase-native-client/include/hbase/client/async-scan-rpc-retrying-caller.h new file mode 100644 index 0000000..68382e6 --- /dev/null +++ b/hbase-native-client/include/hbase/client/async-scan-rpc-retrying-caller.h @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/Conv.h> +#include <folly/ExceptionWrapper.h> +#include <folly/Format.h> +#include <folly/Logging.h> +#include <folly/futures/Future.h> +#include <folly/io/async/EventBase.h> +#include <folly/io/async/HHWheelTimer.h> + +#include <algorithm> +#include <chrono> +#include <functional> +#include <memory> +#include <string> +#include <type_traits> +#include <utility> +#include <vector> + +#include "hbase/connection/rpc-client.h" +#include "hbase/client/async-connection.h" +#include "hbase/client/hbase-rpc-controller.h" +#include "hbase/client/raw-scan-result-consumer.h" +#include "hbase/client/region-location.h" +#include "hbase/client/request-converter.h" +#include "hbase/client/response-converter.h" +#include "hbase/client/result.h" +#include "hbase/client/scan-result-cache.h" +#include "hbase/client/scan.h" +#include "hbase/exceptions/exception.h" +#include "hbase/if/Client.pb.h" +#include "hbase/if/HBase.pb.h" +#include "hbase/utils/bytes-util.h" +#include "hbase/utils/connection-util.h" +#include "hbase/utils/optional.h" +#include "hbase/utils/sys-util.h" +#include "hbase/utils/time-util.h" + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +class AsyncScanRpcRetryingCaller; + +// The resume method is allowed to be called in another thread so here we also use the +// ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back +// from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED, +// and when user calls resume method, we will change the state to RESUMED. But the resume method +// could be called in other thread, and in fact, user could just do this: +// controller.suspend().resume() +// This is strange but valid. This means the scan could be resumed before we call the prepare +// method to do the actual suspend work. So in the resume method, we will check if the state is +// INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare +// method, if the state is RESUMED already, we will just return an let the scan go on. +// Notice that, the public methods of this class is supposed to be called by upper layer only, and +// package private methods can only be called within the implementation of +// AsyncScanSingleRegionRpcRetryingCaller. +// TODO: Unlike the Java counter part, we do not do scan lease renewals in a background thread. +// Since there is also no async scan API exposed to the users, only ScanResultConsumer is the +// AsyncTableResultScanner which will only pause the scanner if the result cache is maxed. The +// application is expected to consume the scan results before the scanner lease timeout. +class ScanResumerImpl : public ScanResumer { + public: + explicit ScanResumerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller); + + virtual ~ScanResumerImpl() = default; + + /** + * Resume the scan. You are free to call it multiple time but only the first call will take + * effect. + */ + void Resume() override; + + // return false if the scan has already been resumed. See the comment above for ScanResumerImpl + // for more details. + bool Prepare(std::shared_ptr<pb::ScanResponse> resp, int num_complete_rows); + + private: + // INITIALIZED -> SUSPENDED -> RESUMED + // INITIALIZED -> RESUMED + ScanResumerState state_ = ScanResumerState::kInitialized; + std::mutex mutex_; + std::shared_ptr<pb::ScanResponse> resp_ = nullptr; + int64_t num_complete_rows_ = 0; + std::shared_ptr<AsyncScanRpcRetryingCaller> caller_; +}; + +class ScanControllerImpl : public ScanController { + public: + virtual ~ScanControllerImpl() = default; + + explicit ScanControllerImpl(std::shared_ptr<AsyncScanRpcRetryingCaller> caller); + + /** + * Suspend the scan. + * <p> + * This means we will stop fetching data in background, i.e., will not call onNext any more + * before you resume the scan. + * @return A resumer used to resume the scan later. + */ + std::shared_ptr<ScanResumer> Suspend(); + + /** + * Terminate the scan. + * <p> + * This is useful when you have got enough results and want to stop the scan in onNext method, + * or you want to stop the scan in onHeartbeat method because it has spent too many time. + */ + void Terminate(); + + // return the current state, and set the state to DESTROYED. + ScanControllerState Destroy(); + + std::shared_ptr<ScanResumerImpl> resumer() { return resumer_; } + + private: + void PreCheck(); + + std::string DebugString(ScanControllerState state); + + std::string DebugString(ScanResumerState state); + + private: + // Make sure the methods are only called in this thread. + std::thread::id caller_thread_id_ = std::this_thread::get_id(); + // INITIALIZED -> SUSPENDED -> DESTROYED + // INITIALIZED -> TERMINATED -> DESTROYED + // INITIALIZED -> DESTROYED + // If the state is incorrect we will throw IllegalStateException. + ScanControllerState state_ = ScanControllerState::kInitialized; + std::shared_ptr<ScanResumerImpl> resumer_ = nullptr; + std::shared_ptr<AsyncScanRpcRetryingCaller> caller_; +}; + +class AsyncScanRpcRetryingCaller : public std::enable_shared_from_this<AsyncScanRpcRetryingCaller> { + public: + AsyncScanRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn, + std::shared_ptr<folly::HHWheelTimer> retry_timer, + std::shared_ptr<hbase::RpcClient> rpc_client, + std::shared_ptr<Scan> scan, int64_t scanner_id, + std::shared_ptr<ScanResultCache> results_cache, + std::shared_ptr<RawScanResultConsumer> consumer, + std::shared_ptr<RegionLocation> region_location, + nanoseconds scanner_lease_timeout_nanos, nanoseconds pause, + uint32_t max_retries, nanoseconds scan_timeout_nanos, + nanoseconds rpc_timeout_nanos, uint32_t start_log_errors_count); + + folly::Future<bool> Start(std::shared_ptr<HBaseRpcController> controller, + std::shared_ptr<pb::ScanResponse> open_scan_resp, + const std::shared_ptr<CellScanner> cell_scanner); + + private: + int64_t RemainingTimeNs(); + void OnComplete(std::shared_ptr<HBaseRpcController> controller, + std::shared_ptr<pb::ScanResponse> resp, + const std::shared_ptr<CellScanner> cell_scanner); + + void CompleteOrNext(std::shared_ptr<pb::ScanResponse> resp); + + void CompleteExceptionally(bool close_scanner); + + void CompleteNoMoreResults(); + + void CompleteWhenNoMoreResultsInRegion(); + + void CompleteWithNextStartRow(std::string row, bool inclusive); + + void UpdateNextStartRowWhenError(const Result& result); + + void CompleteWhenError(bool close_scanner); + + void OnError(const folly::exception_wrapper& e); + + bool NoMoreResultsForScan(const Scan& scan, const pb::RegionInfo& info); + + void Next(); + + void Call(); + + void CloseScanner(); + + void ResetController(std::shared_ptr<HBaseRpcController> controller, + const int64_t& timeout_nanos); + + private: + std::shared_ptr<AsyncConnection> conn_; + std::shared_ptr<folly::HHWheelTimer> retry_timer_; + std::shared_ptr<hbase::RpcClient> rpc_client_; + std::shared_ptr<Scan> scan_; + int64_t scanner_id_; + std::shared_ptr<ScanResultCache> results_cache_; + std::shared_ptr<RawScanResultConsumer> consumer_; + std::shared_ptr<RegionLocation> region_location_; + nanoseconds scanner_lease_timeout_nanos_; + nanoseconds pause_; + uint32_t max_retries_; + nanoseconds scan_timeout_nanos_; + nanoseconds rpc_timeout_nanos_; + uint32_t start_log_errors_count_; + std::shared_ptr<folly::Promise<bool>> promise_; + std::shared_ptr<HBaseRpcController> controller_; + optional<std::string> next_start_row_when_error_ = optional<std::string>(); + bool include_next_start_row_when_error_ = true; + uint64_t start_ns_; + uint32_t tries_; + std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_; + uint32_t max_attempts_; + int64_t next_call_seq_ = -1L; + + friend class ScanResumerImpl; + friend class ScanControllerImpl; +}; + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/async-table-result-scanner.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/async-table-result-scanner.h b/hbase-native-client/include/hbase/client/async-table-result-scanner.h new file mode 100644 index 0000000..0e1d444 --- /dev/null +++ b/hbase-native-client/include/hbase/client/async-table-result-scanner.h @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/Conv.h> +#include <folly/ExceptionWrapper.h> +#include <folly/Logging.h> +#include <chrono> +#include <condition_variable> +#include <memory> +#include <mutex> +#include <queue> +#include <string> +#include <vector> + +#include "hbase/client/raw-scan-result-consumer.h" +#include "hbase/client/result-scanner.h" +#include "hbase/client/result.h" +#include "hbase/if/Client.pb.h" +#include "hbase/if/HBase.pb.h" + +namespace hbase { + +class AsyncTableResultScanner : public ResultScanner, public RawScanResultConsumer { + public: + explicit AsyncTableResultScanner(int64_t max_cache_size); + + virtual ~AsyncTableResultScanner(); + + void Close() override; + + std::shared_ptr<Result> Next() override; + + void OnNext(const std::vector<std::shared_ptr<Result>> &results, + std::shared_ptr<ScanController> controller) override; + + /** + * Indicate that there is an heartbeat message but we have not cumulated enough cells to call + * onNext. + * <p> + * This method give you a chance to terminate a slow scan operation. + * @param controller used to suspend or terminate the scan. Notice that the {@code controller} + * instance is only valid within the scope of onHeartbeat method. You can only call its + * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat. + */ + void OnHeartbeat(std::shared_ptr<ScanController> controller) override; + + /** + * Indicate that we hit an unrecoverable error and the scan operation is terminated. + * <p> + * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}. + */ + void OnError(const folly::exception_wrapper &error) override; + + /** + * Indicate that the scan operation is completed normally. + */ + void OnComplete() override; + + // For testing + uint32_t num_prefetch_stopped() { return num_prefetch_stopped_; } + + private: + void AddToCache(const std::vector<std::shared_ptr<Result>> &results); + + template <typename T> + inline size_t EstimatedSizeWithSharedPtr(std::shared_ptr<T> t); + + void StopPrefetch(std::shared_ptr<ScanController> controller); + + private: + std::queue<std::shared_ptr<Result>> queue_; + std::mutex mutex_; + std::condition_variable cond_; + folly::exception_wrapper error_; + int64_t cache_size_; + int64_t max_cache_size_; + bool closed_; + std::shared_ptr<ScanResumer> resumer_ = nullptr; + uint32_t num_prefetch_stopped_ = 0; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/cell.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/cell.h b/hbase-native-client/include/hbase/client/cell.h new file mode 100644 index 0000000..7a62a9b --- /dev/null +++ b/hbase-native-client/include/hbase/client/cell.h @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <cstdint> +#include <string> + +namespace hbase { + +enum class CellType { + MINIMUM = 0, + PUT = 4, + DELETE = 8, + DELETE_FAMILY_VERSION = 10, + DELETE_COLUMN = 12, + DELETE_FAMILY = 14, + MAXIMUM = 255 +}; + +class Cell { + public: + Cell(const std::string &row, const std::string &family, const std::string &qualifier, + const int64_t timestamp, const std::string &value, const hbase::CellType &cell_type); + Cell(const Cell &cell); + Cell &operator=(const Cell &cell); + virtual ~Cell(); + const std::string &Row() const; + const std::string &Family() const; + const std::string &Qualifier() const; + int64_t Timestamp() const; + const std::string &Value() const; + CellType Type() const; + int64_t SequenceId() const; + std::string DebugString() const; + /** Returns estimated size of the Cell object including deep heap space usage + * of its data. Notice that this is a very rough estimate. */ + size_t EstimatedSize() const; + + private: + std::string row_; + std::string family_; + std::string qualifier_; + // Since java does not have unsigned, we are also using signed numerics here + // so that we won't have surprises when large uint64's are treated as + // negative values in the java server side + int64_t timestamp_; + hbase::CellType cell_type_; + std::string value_; + int64_t sequence_id_; + + private: + static const char *TypeToString(CellType type); +}; + +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/include/hbase/client/client.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/include/hbase/client/client.h b/hbase-native-client/include/hbase/client/client.h new file mode 100644 index 0000000..2e77df9 --- /dev/null +++ b/hbase-native-client/include/hbase/client/client.h @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <memory> +#include <string> + +#include "hbase/connection/rpc-client.h" +#include "hbase/client/async-connection.h" +#include "hbase/client/configuration.h" + +#include "hbase/client/table.h" +#include "hbase/serde/table-name.h" + +namespace hbase { + +class Table; +/** + * Client. + * + * This is the class that provides access to an HBase cluster. + * It is thread safe and does connection pooling. Current recommendations are to + * have only one Client per cluster around. + */ +class Client { + public: + /** + * @brief Create a new client. + * @param quorum_spec Where to connect to get Zookeeper bootstrap information. + */ + Client(); + explicit Client(const Configuration& conf); + ~Client() = default; + + /** + * @brief Retrieve a Table implementation for accessing a table. + * @param - table_name + */ + std::unique_ptr<::hbase::Table> Table(const pb::TableName& table_name); + + /** + * @brief Close the Client connection. + */ + void Close(); + + /** + * @brief Internal. DO NOT USE. + */ + std::shared_ptr<AsyncConnectionImpl> async_connection() { return async_connection_; } + + private: + /** Data */ + std::shared_ptr<AsyncConnectionImpl> async_connection_; + + private: + /** Methods */ + void Init(const Configuration& conf); +}; + +} // namespace hbase