This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 3946ce5f8 KUDU-2671 range-specific hash schema support in Python client
3946ce5f8 is described below

commit 3946ce5f89604477b2a4af03f38de97d384affd2
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Mon Jul 18 11:37:38 2022 -0700

    KUDU-2671 range-specific hash schema support in Python client
    
    With this patch, it's now possible to operate on ranges having custom
    hash schemas in Kudu Python client applications.  In essence, the newly
    added API directly maps into the Kudu C++ client API.  This patch also
    contains tests to cover the newly introduced functionality.
    
    Change-Id: I61426fadc45d70805cf99461d559f0152a79f4a0
    Reviewed-on: http://gerrit.cloudera.org:8080/18771
    Reviewed-by: Attila Bukor <abu...@apache.org>
    Tested-by: Kudu Jenkins
---
 python/kudu/client.pyx           | 139 +++++++++++++++++++++++++++++++++++++++
 python/kudu/libkudu_client.pxd   |  12 ++++
 python/kudu/tests/test_client.py | 109 +++++++++++++++++++++++++++++-
 3 files changed, 259 insertions(+), 1 deletion(-)

diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 1a09fbb31..78c20925c 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -419,6 +419,7 @@ cdef class Client:
             PartialRow lower_bound
             PartialRow upper_bound
             PartialRow split_row
+            KuduRangePartition* range_partition
 
         # Apply hash partitioning.
         for col_names, num_buckets, seed in part._hash_partitions:
@@ -429,12 +430,36 @@ cdef class Client:
                 c.add_hash_partitions(v, num_buckets, seed)
             else:
                 c.add_hash_partitions(v, num_buckets)
+
         # Apply range partitioning
         if part._range_partition_cols is not None:
             v.clear()
             for n in part._range_partition_cols:
                 v.push_back(tobytes(n))
             c.set_range_partition_columns(v)
+            if part._range_partitions_with_custom_hash_schemas:
+                for p in part._range_partitions_with_custom_hash_schemas:
+                    if not isinstance(p.lower_bound, PartialRow):
+                        lower_bound = schema.new_row(p.lower_bound)
+                    else:
+                        lower_bound = p.lower_bound
+                    lower_bound._own = 0
+                    if not isinstance(p.upper_bound, PartialRow):
+                        upper_bound = schema.new_row(p.upper_bound)
+                    else:
+                        upper_bound = p.upper_bound
+                    upper_bound._own = 0
+                    range_partition = new KuduRangePartition(
+                        lower_bound.row,
+                        upper_bound.row,
+                        p.lower_bound_type,
+                        p.upper_bound_type)
+                    for col_names, num_buckets, seed in p.hash_dimensions:
+                        v.clear()
+                        for n in col_names:
+                            v.push_back(tobytes(n))
+                        range_partition.add_hash_partitions(v, num_buckets, 
seed if seed else 0)
+                    c.add_custom_range_partition(range_partition)
             if part._range_partitions:
                 for partition in part._range_partitions:
                     if not isinstance(partition[0], PartialRow):
@@ -1208,6 +1233,53 @@ cdef class Column:
 
         return result
 
+
+class RangePartition(object):
+    """
+    Argument to Client.add_custom_range_partition(...) to contain information
+    on the range bounds and range-specific hash schema.
+    """
+    def __init__(self,
+                 lower_bound=None,
+                 upper_bound=None,
+                 lower_bound_type='inclusive',
+                 upper_bound_type='exclusive'):
+        """
+        Parameters
+        ----------
+        lower_bound : PartialRow/list/tuple/dict
+        upper_bound : PartialRow/list/tuple/dict
+        lower_bound_type : {'inclusive', 'exclusive'} or constants
+          kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND
+        upper_bound_type : {'inclusive', 'exclusive'} or constants
+          kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND
+        """
+        self.lower_bound = lower_bound
+        self.upper_bound = upper_bound
+        self.lower_bound_type = 
_check_convert_range_bound_type(lower_bound_type)
+        self.upper_bound_type = 
_check_convert_range_bound_type(upper_bound_type)
+        self.hash_dimensions = []
+
+    def add_hash_partitions(self, column_names, num_buckets, seed=None):
+        """
+        Adds a dimension with the specified parameters to the custom hash 
schema
+        for this range partition.
+
+        Parameters
+        ----------
+        column_names : list of string column names on which to compute hash 
function
+        num_buckets : the number of buckets for the hash function
+        seed : int - optional; the seed for the hash function mapping rows to 
buckets
+
+        Returns
+        -------
+        self: this object
+        """
+        if isinstance(column_names, str):
+            column_names = [column_names]
+        self.hash_dimensions.append( (column_names, num_buckets, seed) )
+        return self
+
 class Partitioning(object):
     """ Argument to Client.create_table(...) to describe table partitioning. 
"""
 
@@ -1215,6 +1287,7 @@ class Partitioning(object):
         self._hash_partitions = []
         self._range_partition_cols = None
         self._range_partitions = []
+        self._range_partitions_with_custom_hash_schemas = []
         self._range_partition_splits = []
 
     def add_hash_partitions(self, column_names, num_buckets, seed=None):
@@ -1302,9 +1375,27 @@ class Partitioning(object):
         else:
             raise ValueError("Range Partition Columns must be set before " +
                              "adding a range partition.")
+        return self
+
 
+    def add_custom_range_partition(self, range_partition):
+        """
+        Parameters
+        ----------
+        range_partition : range partition with custom hash schema to add
+
+        Returns
+        -------
+        self : Partitioning
+        """
+        if self._range_partition_cols is None:
+            raise ValueError("Range Partition Columns must be set before " +
+                             "adding a range partition.")
+
+        self._range_partitions_with_custom_hash_schemas.append(range_partition)
         return self
 
+
     def add_range_partition_split(self, split_row):
         """
         Add a range partition split at the provided row.
@@ -3219,6 +3310,54 @@ cdef class TableAlterer:
             _check_convert_range_bound_type(upper_bound_type)
         )
 
+
+    def add_custom_range_partition(self, range_partition):
+        """
+        Add a range partition with custom hash schema.
+
+        Multiple range partitions may be added as part of a single alter table
+        transaction by calling this method multiple times on the table alterer.
+
+        This client may immediately write and scan the new tablets when Alter()
+        returns success, however other existing clients may have to wait for a
+        timeout period to elapse before the tablets become visible. This period
+        is configured by the master's 'table_locations_ttl_ms' flag, and
+        defaults to 5 minutes.
+
+        Parameters
+        ----------
+        range_partition : RangePartition
+
+        Returns
+        -------
+        self : TableAlterer
+        """
+        cdef:
+            vector[string] v
+            KuduRangePartition* p
+            PartialRow lower_bound
+            PartialRow upper_bound
+
+        if not isinstance(range_partition.lower_bound, PartialRow):
+            lower_bound = 
self._table.schema.new_row(range_partition.lower_bound)
+        else:
+            lower_bound = range_partition.lower_bound
+        lower_bound._own = 0
+        if not isinstance(range_partition.upper_bound, PartialRow):
+            upper_bound = 
self._table.schema.new_row(range_partition.upper_bound)
+        else:
+            upper_bound = range_partition.upper_bound
+        upper_bound._own = 0
+        p = new KuduRangePartition(
+            lower_bound.row, upper_bound.row,
+            range_partition.lower_bound_type, range_partition.upper_bound_type)
+        for col_names, num_buckets, seed in range_partition.hash_dimensions:
+            v.clear()
+            for n in col_names:
+                v.push_back(tobytes(n))
+            p.add_hash_partitions(v, num_buckets, seed if seed else 0)
+        self._alterer.AddRangePartition(p)
+
     def drop_range_partition(self, lower_bound=None,
                              upper_bound=None,
                              lower_bound_type='inclusive',
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index 22cb24937..e23cca6be 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -596,7 +596,17 @@ cdef extern from "kudu/client/client.h" namespace 
"kudu::client" nogil:
         string& hostname()
         uint16_t port()
 
+    cdef cppclass KuduRangePartition:
+        KuduRangePartition(KuduPartialRow* lower_bound,
+                           KuduPartialRow* upper_bound,
+                           RangePartitionBound lower_bound_type,
+                           RangePartitionBound upper_bound_type)
+        Status add_hash_partitions(vector[string]& columns,
+                                   int num_buckets,
+                                   int seed)
+
     cdef cppclass KuduTableCreator:
+
         KuduTableCreator& table_name(string& name)
         KuduTableCreator& schema(KuduSchema* schema)
         KuduTableCreator& add_hash_partitions(vector[string]& columns,
@@ -609,6 +619,7 @@ cdef extern from "kudu/client/client.h" namespace 
"kudu::client" nogil:
                                               KuduPartialRow* upper_bound,
                                               RangePartitionBound 
lower_bound_type,
                                               RangePartitionBound 
upper_bound_type)
+        KuduTableCreator& add_custom_range_partition(KuduRangePartition* p)
         KuduTableCreator& add_range_partition_split(KuduPartialRow* split_row)
         KuduTableCreator& split_rows(vector[const KuduPartialRow*]& split_rows)
         KuduTableCreator& num_replicas(int n_replicas)
@@ -627,6 +638,7 @@ cdef extern from "kudu/client/client.h" namespace 
"kudu::client" nogil:
                                             KuduPartialRow* upper_bound,
                                             RangePartitionBound 
lower_bound_type,
                                             RangePartitionBound 
upper_bound_type)
+        KuduTableAlterer& AddRangePartition(KuduRangePartition* p)
         KuduTableAlterer& DropRangePartition(KuduPartialRow* lower_bound,
                                              KuduPartialRow* upper_bound,
                                              RangePartitionBound 
lower_bound_type,
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index 184eca638..5b9b4fd39 100755
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -18,7 +18,10 @@
 
 from kudu.compat import unittest, long
 from kudu.tests.common import KuduTestBase
-from kudu.client import (Partitioning, ENCRYPTION_OPTIONAL, 
ENCRYPTION_REQUIRED,
+from kudu.client import (Partitioning,
+                         RangePartition,
+                         ENCRYPTION_OPTIONAL,
+                         ENCRYPTION_REQUIRED,
                          ENCRYPTION_REQUIRED_REMOTE)
 import kudu
 import datetime
@@ -155,6 +158,44 @@ class TestClient(KuduTestBase, unittest.TestCase):
             except:
                 pass
 
+    def test_create_table_with_range_specific_hash_schemas(self):
+        table_name = 'create_table_range_specific_hash_schemas'
+        try:
+            # define range with custom hash schema
+            p = RangePartition({'key': 0}, {'key': 100})
+            p.add_hash_partitions(['key'], 5)
+
+            self.client.create_table(
+                table_name, self.schema,
+                partitioning=Partitioning()
+                    .set_range_partition_columns(['key'])
+                    .add_hash_partitions(['key'], 2)
+                    .add_range_partition(
+                        lower_bound={'key': -100},
+                        upper_bound={'key': 0})
+                    .add_custom_range_partition(p)
+            )
+
+            # rely on 1-1 mapping between tokens and tablets for full table 
scan
+            table = self.client.table(table_name)
+            builder = table.scan_token_builder()
+            builder.set_fault_tolerant()
+            tokens = builder.build()
+            self.assertEqual(7, len(tokens))
+
+            session = self.client.new_session()
+            for i in range(-100, 100):
+                op = table.new_insert((i, i))
+                session.apply(op)
+            session.flush()
+
+            self.client.delete_table(table_name)
+        finally:
+            try:
+                self.client.delete_table(table_name)
+            except:
+                pass
+
     def test_create_table_with_different_owner(self):
         name = 'table_with_different_owner'
         try:
@@ -500,6 +541,72 @@ class TestClient(KuduTestBase, unittest.TestCase):
         alterer.add_range_partition()
         table = alterer.alter()
 
+    def test_alter_table_add_partition_with_custom_hash_schema(self):
+        table_name = 'add_partition_with_custom_hash_schema'
+        try:
+            # create table with [-100, 0) range having table-wide hash schema
+            self.client.create_table(
+                table_name, self.schema,
+                partitioning=Partitioning()
+                    .set_range_partition_columns(['key'])
+                    .add_hash_partitions(['key'], 3)
+                    .add_range_partition(
+                    lower_bound={'key': -100},
+                    upper_bound={'key': 0})
+            )
+
+            # open the newly created table
+            table = self.client.table(table_name)
+
+            # define range with custom hash schema
+            p = RangePartition({'key': 0}, {'key': 100})
+            p.add_hash_partitions(['key'], 2, 8)
+
+            alterer = self.client.new_table_alterer(table)
+            alterer.add_custom_range_partition(p)
+            table = alterer.alter()
+
+            # rely on 1-1 mapping between tokens and tablets for full table 
scan
+            builder = table.scan_token_builder()
+            builder.set_fault_tolerant()
+            tokens = builder.build()
+            self.assertEqual(5, len(tokens))
+
+            session = self.client.new_session()
+            for i in range(-100, 100):
+                op = table.new_insert((i, i))
+                session.apply(op)
+            session.flush()
+
+            # drop the new custom range partition that hash just been added
+            alterer = self.client.new_table_alterer(table)
+            alterer.drop_range_partition({'key': 0}, {'key': 100})
+            table = alterer.alter()
+
+            # rely on 1-1 mapping between tokens and tablets for full table 
scan
+            builder = table.scan_token_builder()
+            builder.set_fault_tolerant()
+            tokens = builder.build()
+            self.assertEqual(3, len(tokens))
+
+            # drop the range partition that have table-wide hash schema
+            alterer = self.client.new_table_alterer(table)
+            alterer.drop_range_partition({'key': -100}, {'key': 0})
+            table = alterer.alter()
+
+            # rely on 1-1 mapping between tokens and tablets for full table 
scan
+            builder = table.scan_token_builder()
+            builder.set_fault_tolerant()
+            tokens = builder.build()
+            self.assertEqual(0, len(tokens))
+
+            self.client.delete_table(table_name)
+        finally:
+            try:
+                self.client.delete_table(table_name)
+            except:
+                pass
+
     def test_require_encryption(self):
         client = kudu.connect(self.master_hosts, self.master_ports,
                               encryption_policy=ENCRYPTION_REQUIRED)

Reply via email to