This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new b273abce7 feat(python-client): add scan filter support (#2305)
b273abce7 is described below
commit b273abce70d5c9c96d3e6f1884913dd491a4075b
Author: Jun 11 <[email protected]>
AuthorDate: Mon Mar 9 17:29:03 2026 +0800
feat(python-client): add scan filter support (#2305)
Add support for hashkey and sortkey scan filters.
In original logic `generate_next_bytes` function has two problem:
a. The input buff (i.e. hashkey) can be either ‘str’ or ‘bytearray’. If
it's a
str, in-place modification like `buff[pos] += 1` won't work since strings
are immutable.
b. The pos variable was initialized to a fixed index (len(buff) - 1), which
is counterintuitive and could lead to an infinite loop.
https://github.com/apache/incubator-pegasus/blob/44400f6e3ca1fb1ce48d04d1c9145aad7ac4e991/python-client/pypegasus/pgclient.py#L613-L624
---
python-client/pypegasus/base/ttypes.py | 13 +++++-
python-client/pypegasus/pgclient.py | 73 ++++++++++++++++++++++++++++------
python-client/pypegasus/utils/tools.py | 12 +++++-
3 files changed, 81 insertions(+), 17 deletions(-)
diff --git a/python-client/pypegasus/base/ttypes.py
b/python-client/pypegasus/base/ttypes.py
index 6ed2e88de..e983cc7ad 100644
--- a/python-client/pypegasus/base/ttypes.py
+++ b/python-client/pypegasus/base/ttypes.py
@@ -44,11 +44,20 @@ class blob:
def validate(self):
return
+
+ def raw(self):
+ if self._is_str:
+ return self.data.decode('UTF-8')
+ else:
+ return self.data
def __init__(self, data=None):
if isinstance(data,str):
- data = data.encode('UTF-8')
- self.data = data
+ self._is_str = True
+ self.data = data.encode('UTF-8')
+ else:
+ self._is_str = False
+ self.data = data
def __hash__(self):
value = 17
diff --git a/python-client/pypegasus/pgclient.py
b/python-client/pypegasus/pgclient.py
index a897b345a..3fb9f6936 100644
--- a/python-client/pypegasus/pgclient.py
+++ b/python-client/pypegasus/pgclient.py
@@ -35,7 +35,7 @@ from pypegasus.operate.packet import *
from pypegasus.replication.ttypes import query_cfg_request
from pypegasus.rrdb import *
from pypegasus.rrdb.ttypes import scan_request, get_scanner_request,
update_request, key_value, multi_put_request, \
- multi_get_request, multi_remove_request
+ multi_get_request, multi_remove_request, filter_type
from pypegasus.transport.protocol import *
from pypegasus.utils.tools import restore_key, get_ttl, bytes_cmp, ScanOptions
@@ -504,6 +504,10 @@ class PegasusScanner(object):
request.stop_inclusive = self._scan_options.stop_inclusive
request.batch_size = self._scan_options.batch_size
request.need_check_hash = self._check_hash
+ request.sort_key_filter_type = self._scan_options.sortkey_filter_type
+ request.sort_key_filter_pattern =
blob(self._scan_options.sortkey_filter_pattern)
+ request.hash_key_filter_type = self._scan_options.hashkey_filter_type
+ request.hash_key_filter_pattern =
blob(self._scan_options.hashkey_filter_pattern)
op = RrdbGetScannerOperator(self._gpid, request, self._partition_hash)
session = self._table.get_session(self._gpid)
@@ -605,6 +609,9 @@ class Pegasus(object):
hash_key_len = len(hash_key)
sort_key_len = len(sort_key)
+ if hash_key_len >= 0xFFFF:
+ raise ValueError("hash_key length must be less than 65535")
+
if sort_key_len > 0:
values = (hash_key_len, hash_key, sort_key)
s = struct.Struct('>H'+str(hash_key_len)+'s'+str(sort_key_len)+'s')
@@ -619,24 +626,50 @@ class Pegasus(object):
@classmethod
def generate_next_bytes(cls, buff):
- pos = len(buff) - 1
+ """
+ Increment the last non-0xFF byte in the buffer.
+
+ If `buff` is a string, it is assumed to be encoded with 'latin-1' to
ensure
+ a 1:1 mapping between characters and bytes. Unicode strings with
characters
+ outside the 0-255 range will raise a UnicodeEncodeError.
+ """
+ is_str = isinstance(buff, str)
+ is_ba = isinstance(buff, bytearray)
+
+ if is_str:
+ arr = bytearray(buff.encode('latin-1'))
+ elif is_ba:
+ arr = buff
+ else:
+ arr = bytearray(buff)
+ pos = len(arr) - 1
found = False
while pos >= 0:
- if ord(buff[pos]) != 0xFF:
- buff[pos] += 1
+ if arr[pos] != 0xFF:
+ arr[pos] += 1
found = True
break
- if found:
- return buff
+ pos -= 1
+ if not found:
+ arr += b'\x00'
+ if is_str:
+ return arr.decode('latin-1')
+ elif is_ba:
+ return arr
else:
- return buff + chr(0)
+ return bytes(arr)
+ @classmethod
+ def generate_next_key(cls, hash_key, stop_sort_key):
+ key = cls.generate_key(hash_key, stop_sort_key)
+ return blob(cls.generate_next_bytes(key.raw()))
+
@classmethod
def generate_stop_key(cls, hash_key, stop_sort_key):
if stop_sort_key:
return cls.generate_key(hash_key, stop_sort_key), True
else:
- return cls.generate_next_bytes(hash_key), False
+ return blob(cls.generate_next_bytes(hash_key)), False
def __init__(self, meta_addrs=None, table_name='',
timeout=DEFAULT_TIMEOUT):
@@ -1012,6 +1045,24 @@ class Pegasus(object):
stop_key, stop_inclusive = self.generate_stop_key(hash_key,
stop_sort_key)
if not stop_inclusive:
scan_options.stop_inclusive = stop_inclusive
+
+ # limit key range by prefix filter
+ if scan_options.sortkey_filter_type == filter_type.FT_MATCH_PREFIX and
\
+ len(scan_options.sortkey_filter_pattern) > 0:
+ prefix_start = self.generate_key(hash_key,
scan_options.sortkey_filter_pattern)
+ # If the prefix start is after the current start_key, move the
scan start to the prefix.
+ if bytes_cmp(prefix_start.data, start_key.data) > 0:
+ start_key = prefix_start
+ scan_options.start_inclusive = True
+
+ prefix_stop = self.generate_next_key(hash_key,
scan_options.sortkey_filter_pattern)
+ # If the prefix stop is before or equal to the current stop_key,
move the scan stop to the prefix stop.
+ # The prefix stop represents the next key after hash_key and
sortkey_filter_pattern,
+ # so stop_inclusive should be False.
+ if bytes_cmp(prefix_stop.data, stop_key.data) <= 0:
+ stop_key = prefix_stop
+ scan_options.stop_inclusive = False
+
gpid_list = []
hash_list = []
r = bytes_cmp(start_key.data, stop_key.data)
@@ -1041,10 +1092,6 @@ class Pegasus(object):
size = count // split
more = count % split
- opt = ScanOptions()
- opt.timeout_millis = scan_options.timeout_millis
- opt.batch_size = scan_options.batch_size
- opt.snapshot = scan_options.snapshot
scanner_list = []
for i in range(split):
gpid_list = []
@@ -1056,6 +1103,6 @@ class Pegasus(object):
gpid_list.append(all_gpid_list[count])
hash_list.append(int(count))
- scanner_list.append(PegasusScanner(self.table, gpid_list, opt,
hash_list, True))
+ scanner_list.append(PegasusScanner(self.table, gpid_list,
scan_options, hash_list, True))
return scanner_list
diff --git a/python-client/pypegasus/utils/tools.py
b/python-client/pypegasus/utils/tools.py
index 79386b3c2..3efbc9c42 100644
--- a/python-client/pypegasus/utils/tools.py
+++ b/python-client/pypegasus/utils/tools.py
@@ -63,7 +63,11 @@ class ScanOptions(object):
self.start_inclusive = True
self.stop_inclusive = False
self.snapshot = None # for future use
-
+ self.sortkey_filter_type = filter_type.FT_NO_FILTER
+ self.sortkey_filter_pattern = ""
+ self.hashkey_filter_type = filter_type.FT_NO_FILTER
+ self.hashkey_filter_pattern = ""
+
def __repr__(self):
lst = ['%s=%r' % (key, value)
for key, value in self.__dict__.items()]
@@ -104,11 +108,15 @@ def restore_key(merge_key):
return hash_key, sort_key
+# This is to ensure compatibility between different byte-like string
representations,
+# such as 'bytes' and 'str' in various Python versions.
+def bval(ch):
+ return ch if isinstance(ch, int) else ord(ch)
def bytes_cmp(left, right):
min_len = min(len(left), len(right))
for i in range(min_len):
- r = ord(left[i]) - ord(right[i])
+ r = bval(left[i]) - bval(right[i])
if r != 0:
return r
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]