This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new b273abce7 feat(python-client): add scan filter support (#2305)
b273abce7 is described below

commit b273abce70d5c9c96d3e6f1884913dd491a4075b
Author: Jun 11 <[email protected]>
AuthorDate: Mon Mar 9 17:29:03 2026 +0800

    feat(python-client): add scan filter support (#2305)
    
    Add support for hashkey and sortkey scan filters.
    
    In original logic `generate_next_bytes` function has two problem:
    a. The input buff (i.e. hashkey) can be either ‘str’ or ‘bytearray’. If 
it's a
    str, in-place modification like `buff[pos] += 1` won't work since strings
    are immutable.
    b. The pos variable was initialized to a fixed index (len(buff) - 1), which
    is counterintuitive and could lead to an infinite loop.
    
https://github.com/apache/incubator-pegasus/blob/44400f6e3ca1fb1ce48d04d1c9145aad7ac4e991/python-client/pypegasus/pgclient.py#L613-L624
---
 python-client/pypegasus/base/ttypes.py | 13 +++++-
 python-client/pypegasus/pgclient.py    | 73 ++++++++++++++++++++++++++++------
 python-client/pypegasus/utils/tools.py | 12 +++++-
 3 files changed, 81 insertions(+), 17 deletions(-)

diff --git a/python-client/pypegasus/base/ttypes.py 
b/python-client/pypegasus/base/ttypes.py
index 6ed2e88de..e983cc7ad 100644
--- a/python-client/pypegasus/base/ttypes.py
+++ b/python-client/pypegasus/base/ttypes.py
@@ -44,11 +44,20 @@ class blob:
 
   def validate(self):
     return
+  
+  def raw(self):
+    if self._is_str:
+      return self.data.decode('UTF-8')
+    else:
+      return self.data
 
   def __init__(self, data=None):
     if isinstance(data,str):
-        data = data.encode('UTF-8')
-    self.data = data
+        self._is_str = True
+        self.data = data.encode('UTF-8')
+    else:
+        self._is_str = False
+        self.data = data
 
   def __hash__(self):
     value = 17
diff --git a/python-client/pypegasus/pgclient.py 
b/python-client/pypegasus/pgclient.py
index a897b345a..3fb9f6936 100644
--- a/python-client/pypegasus/pgclient.py
+++ b/python-client/pypegasus/pgclient.py
@@ -35,7 +35,7 @@ from pypegasus.operate.packet import *
 from pypegasus.replication.ttypes import query_cfg_request
 from pypegasus.rrdb import *
 from pypegasus.rrdb.ttypes import scan_request, get_scanner_request, 
update_request, key_value, multi_put_request, \
-    multi_get_request, multi_remove_request
+    multi_get_request, multi_remove_request, filter_type
 from pypegasus.transport.protocol import *
 from pypegasus.utils.tools import restore_key, get_ttl, bytes_cmp, ScanOptions
 
@@ -504,6 +504,10 @@ class PegasusScanner(object):
         request.stop_inclusive = self._scan_options.stop_inclusive
         request.batch_size = self._scan_options.batch_size
         request.need_check_hash = self._check_hash
+        request.sort_key_filter_type = self._scan_options.sortkey_filter_type 
+        request.sort_key_filter_pattern = 
blob(self._scan_options.sortkey_filter_pattern)
+        request.hash_key_filter_type = self._scan_options.hashkey_filter_type
+        request.hash_key_filter_pattern = 
blob(self._scan_options.hashkey_filter_pattern)
 
         op = RrdbGetScannerOperator(self._gpid, request, self._partition_hash)
         session = self._table.get_session(self._gpid)
@@ -605,6 +609,9 @@ class Pegasus(object):
         hash_key_len = len(hash_key)
         sort_key_len = len(sort_key)
 
+        if hash_key_len >= 0xFFFF:
+            raise ValueError("hash_key length must be less than 65535")
+        
         if sort_key_len > 0:
             values = (hash_key_len, hash_key, sort_key)
             s = struct.Struct('>H'+str(hash_key_len)+'s'+str(sort_key_len)+'s')
@@ -619,24 +626,50 @@ class Pegasus(object):
 
     @classmethod
     def generate_next_bytes(cls, buff):
-        pos = len(buff) - 1
+        """
+        Increment the last non-0xFF byte in the buffer.
+        
+        If `buff` is a string, it is assumed to be encoded with 'latin-1' to 
ensure
+        a 1:1 mapping between characters and bytes. Unicode strings with 
characters
+        outside the 0-255 range will raise a UnicodeEncodeError.
+        """
+        is_str = isinstance(buff, str)
+        is_ba = isinstance(buff, bytearray)
+
+        if is_str:
+            arr = bytearray(buff.encode('latin-1'))  
+        elif is_ba:
+            arr = buff  
+        else:
+            arr = bytearray(buff)
+        pos = len(arr) - 1
         found = False
         while pos >= 0:
-            if ord(buff[pos]) != 0xFF:
-                buff[pos] += 1
+            if arr[pos] != 0xFF:
+                arr[pos] += 1
                 found = True
                 break
-        if found:
-            return buff
+            pos -= 1  
+        if not found:
+            arr += b'\x00'
+        if is_str:
+            return arr.decode('latin-1')
+        elif is_ba:
+            return arr
         else:
-            return buff + chr(0)
+            return bytes(arr)
 
+    @classmethod
+    def generate_next_key(cls, hash_key, stop_sort_key):
+        key = cls.generate_key(hash_key, stop_sort_key)
+        return blob(cls.generate_next_bytes(key.raw()))
+            
     @classmethod
     def generate_stop_key(cls, hash_key, stop_sort_key):
         if stop_sort_key:
             return cls.generate_key(hash_key, stop_sort_key), True
         else:
-            return cls.generate_next_bytes(hash_key), False
+            return blob(cls.generate_next_bytes(hash_key)), False
 
     def __init__(self, meta_addrs=None, table_name='',
                  timeout=DEFAULT_TIMEOUT):
@@ -1012,6 +1045,24 @@ class Pegasus(object):
         stop_key, stop_inclusive = self.generate_stop_key(hash_key, 
stop_sort_key)
         if not stop_inclusive:
             scan_options.stop_inclusive = stop_inclusive
+
+        # limit key range by prefix filter
+        if scan_options.sortkey_filter_type == filter_type.FT_MATCH_PREFIX and 
\
+            len(scan_options.sortkey_filter_pattern) > 0:
+            prefix_start = self.generate_key(hash_key, 
scan_options.sortkey_filter_pattern)
+            # If the prefix start is after the current start_key, move the 
scan start to the prefix.
+            if bytes_cmp(prefix_start.data, start_key.data) > 0:
+                start_key = prefix_start
+                scan_options.start_inclusive = True
+
+            prefix_stop = self.generate_next_key(hash_key, 
scan_options.sortkey_filter_pattern)
+            # If the prefix stop is before or equal to the current stop_key, 
move the scan stop to the prefix stop.
+            # The prefix stop represents the next key after hash_key and 
sortkey_filter_pattern, 
+            # so stop_inclusive should be False.
+            if bytes_cmp(prefix_stop.data, stop_key.data) <= 0:
+                stop_key = prefix_stop
+                scan_options.stop_inclusive = False
+
         gpid_list = []
         hash_list = []
         r = bytes_cmp(start_key.data, stop_key.data)
@@ -1041,10 +1092,6 @@ class Pegasus(object):
         size = count // split
         more = count % split
 
-        opt = ScanOptions()
-        opt.timeout_millis = scan_options.timeout_millis
-        opt.batch_size = scan_options.batch_size
-        opt.snapshot = scan_options.snapshot
         scanner_list = []
         for i in range(split):
             gpid_list = []
@@ -1056,6 +1103,6 @@ class Pegasus(object):
                     gpid_list.append(all_gpid_list[count])
                     hash_list.append(int(count))
 
-            scanner_list.append(PegasusScanner(self.table, gpid_list, opt, 
hash_list, True))
+            scanner_list.append(PegasusScanner(self.table, gpid_list, 
scan_options, hash_list, True))
 
         return scanner_list
diff --git a/python-client/pypegasus/utils/tools.py 
b/python-client/pypegasus/utils/tools.py
index 79386b3c2..3efbc9c42 100644
--- a/python-client/pypegasus/utils/tools.py
+++ b/python-client/pypegasus/utils/tools.py
@@ -63,7 +63,11 @@ class ScanOptions(object):
         self.start_inclusive = True
         self.stop_inclusive = False
         self.snapshot = None                   # for future use
-
+        self.sortkey_filter_type = filter_type.FT_NO_FILTER
+        self.sortkey_filter_pattern = ""
+        self.hashkey_filter_type = filter_type.FT_NO_FILTER
+        self.hashkey_filter_pattern = ""
+        
     def __repr__(self):
         lst = ['%s=%r' % (key, value)
                for key, value in self.__dict__.items()]
@@ -104,11 +108,15 @@ def restore_key(merge_key):
     
     return hash_key, sort_key
 
+# This is to ensure compatibility between different byte-like string 
representations, 
+# such as 'bytes' and 'str' in various Python versions.
+def bval(ch):
+    return ch if isinstance(ch, int) else ord(ch)
 
 def bytes_cmp(left, right):
     min_len = min(len(left), len(right))
     for i in range(min_len):
-        r = ord(left[i]) - ord(right[i])
+        r = bval(left[i]) - bval(right[i])
         if r != 0:
             return r
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to