This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 80a184000 KUDU-1945 Auto-incrementing column, Python client 80a184000 is described below commit 80a18400024d36f84c7fa53eac0fba8e2f8b726d Author: Marton Greber <greber...@gmail.com> AuthorDate: Fri Feb 24 12:35:16 2023 +0000 KUDU-1945 Auto-incrementing column, Python client This patch adds the Python part of the client side changes for the auto incrementing column feature. A new ColumnSpec called non_unique_primary_key is added. Semantically it behaves like primary_key: - only one column can have the non_unique_primary_key ColumnSpec in a given SchemaBuilder context, - if it exists, it must be defined in the first place, - compound keys are defined through a set function. Functionally, non-unique primary keys don't need to fulfill the uniqueness constraint. An auto incrementing column is added in the background automatically once a non-unique primary key is specified. The non-unique keys and the auto incrementing column together form the effective primary key. Some technical notes: - The name of the auto incrementing column is hardcoded into the kudu.Schema class. This is a reserved column name, users can't create columns with it. On the client facing side, this reserved string is reachable through kudu.Schema.get_auto_incrementing_column_name(). - In this initial version there is no support for UPSERT and UPSERT_IGNORE operations. - With non-unique primary key, one can't use the tuple/list initialization for new inserts. Change-Id: I94622680c5eb32eb2746a3b84c73699c1a37618c Reviewed-on: http://gerrit.cloudera.org:8080/19566 Tested-by: Kudu Jenkins Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com> Reviewed-by: Yingchun Lai <laiyingc...@apache.org> Reviewed-by: Alexey Serbin <ale...@apache.org> --- python/kudu/libkudu_client.pxd | 5 + python/kudu/schema.pyx | 88 ++++++++++++++++- python/kudu/tests/test_client.py | 152 +++++++++++++++++++++++++++- python/kudu/tests/test_scantoken.py | 61 ++++++++++++ python/kudu/tests/test_schema.py | 192 ++++++++++++++++++++++++++++++++++++ 5 files changed, 494 insertions(+), 4 deletions(-) diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd index e23cca6be..510bc3483 100644 --- a/python/kudu/libkudu_client.pxd +++ b/python/kudu/libkudu_client.pxd @@ -183,6 +183,9 @@ cdef extern from "kudu/client/schema.h" namespace "kudu::client" nogil: KuduSchema(const KuduSchema& schema) KuduSchema(vector[KuduColumnSchema]& columns, int key_columns) + @staticmethod + string GetAutoIncrementingColumnName() + c_bool Equals(const KuduSchema& other) KuduColumnSchema Column(size_t idx) size_t num_columns() @@ -201,6 +204,7 @@ cdef extern from "kudu/client/schema.h" namespace "kudu::client" nogil: KuduColumnSpec* BlockSize(int32_t block_size) KuduColumnSpec* PrimaryKey() + KuduColumnSpec* NonUniquePrimaryKey() KuduColumnSpec* NotNull() KuduColumnSpec* Nullable() KuduColumnSpec* Type(DataType type_) @@ -216,6 +220,7 @@ cdef extern from "kudu/client/schema.h" namespace "kudu::client" nogil: KuduColumnSpec* AddColumn(string& name) KuduSchemaBuilder* SetPrimaryKey(vector[string]& key_col_names); + KuduSchemaBuilder* SetNonUniquePrimaryKey(vector[string]& key_col_names); Status Build(KuduSchema* schema) diff --git a/python/kudu/schema.pyx b/python/kudu/schema.pyx index a71e153f9..ee1265e50 100644 --- a/python/kudu/schema.pyx +++ b/python/kudu/schema.pyx @@ -427,6 +427,35 @@ cdef class ColumnSpec: self.spec.PrimaryKey() return self + def non_unique_primary_key(self): + """ + Make this column a non-unique primary key. + + This may only be used to set non-composite non-unique primary keys. If a composite + key is desired, use set_non_unique_primary_keys(). This may not be + used in conjunction with set_non_unique_primary_keys(). + + Notes + ----- + Non-unique primary keys may not be changed after a table is created. + + By specifying non-unique primary key, an auto incrementing column is created + automatically. They form together the effective primary key. The auto incrementing field + is populated on the server side, it must not be specified during insertion. All subsequent + operations like scans will contain the auto incrementing column by default. If one wants to + omit the auto incrementing column, it can be accomplished through existing projection + methods. + + A call to primary_key() or non_unique_primary_key() overrides any previous call + to these two methods. + + Returns + ------- + self + """ + self.spec.NonUniquePrimaryKey() + return self + def nullable(self, bint is_nullable=True): """ Set nullable (True) or not nullable (False) @@ -503,9 +532,9 @@ cdef class SchemaBuilder: colschema.type, colschema.nullable) - def add_column(self, name, type_=None, nullable=None, compression=None, - encoding=None, primary_key=False, block_size=None, - default=None, precision=None, scale=None, length=None): + def add_column(self, name, type_=None, nullable=None, compression=None, encoding=None, + primary_key=False, non_unique_primary_key=False, block_size=None, default=None, + precision=None, scale=None, length=None): """ Add a new column to the schema. Returns a ColumnSpec object for further configuration and use in a fluid programming style. @@ -526,6 +555,8 @@ cdef class SchemaBuilder: Or see kudu.ENCODING_* constants primary_key : boolean, default False Use this column as the table primary key + non_unique_primary_key : boolean, default False + Use this column as the table non-unique primary key block_size : int, optional Block size (in bytes) to use for the target column. default : obj @@ -577,6 +608,9 @@ cdef class SchemaBuilder: if primary_key: result.primary_key() + if non_unique_primary_key: + result.non_unique_primary_key() + if block_size: result.block_size(block_size) @@ -606,6 +640,43 @@ cdef class SchemaBuilder: self.builder.SetPrimaryKey(key_col_names) + def set_non_unique_primary_keys(self, key_names): + """ + Set the non-unique primary key of the new Schema based on the given column names. + + This may be used to specify a compound non-unique primary key. + + Notes + ----- + Non-unique primary keys may not be changed after a table is created. + + By specifying non-unique primary key, an auto incrementing column is created + automatically. They form together the effective primary key. The auto incrementing field + is populated on the server side, it must not be specified during insertion. All subsequent + operations like scans will contain the auto incrementing column by default. If one wants to + omit the auto incrementing column, it can be accomplished through existing projection + methods. + + A call to primary_key() or non_unique_primary_key() overrides any previous call + to these two methods. + + Parameters + ---------- + key_names: list of Python strings + + Returns + ------- + self + + """ + cdef: + vector[string] key_col_names + + for name in key_names: + key_col_names.push_back(tobytes(name)) + + self.builder.SetNonUniquePrimaryKey(key_col_names) + def build(self): """ Creates an immutable Schema object after the user has finished adding @@ -777,6 +848,17 @@ cdef class Schema: indices = self.primary_key_indices() return [self.at(i).name for i in indices] + @staticmethod + def get_auto_incrementing_column_name(): + """ + Utility function to return the actual name of the auto-incrementing column. + + Returns + ------- + auto-incrementing column name: str + """ + col_name = KuduSchema.GetAutoIncrementingColumnName() + return frombytes(col_name) cdef class KuduValue: diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py index 5b9b4fd39..0000d6210 100755 --- a/python/kudu/tests/test_client.py +++ b/python/kudu/tests/test_client.py @@ -23,6 +23,10 @@ from kudu.client import (Partitioning, ENCRYPTION_OPTIONAL, ENCRYPTION_REQUIRED, ENCRYPTION_REQUIRED_REMOTE) +from kudu.errors import (KuduInvalidArgument, + KuduBadStatus) +from kudu.schema import (Schema, + KuduValue) import kudu import datetime from pytz import utc @@ -228,6 +232,43 @@ class TestClient(KuduTestBase, unittest.TestCase): except: pass + def test_create_table_with_auto_incrementing_column(self): + table_name = 'create_table_with_auto_incrementing_column' + try: + builder = kudu.schema_builder() + (builder.add_column('key', kudu.int32) + .nullable(False) + .non_unique_primary_key()) + builder.add_column('data', kudu.string) + schema = builder.build() + + self.client.create_table( + table_name, schema, + partitioning=Partitioning().add_hash_partitions(['key'], 2)) + + table = self.client.table(table_name) + session = self.client.new_session() + nrows = 10 + for _ in range(nrows): + op = table.new_insert() + op['key'] = 1 + op['data'] = 'text' + session.apply(op) + + session.flush() + + scanner = table.scanner() + results = scanner.open().read_all_tuples() + assert nrows == len(results) + for i in range(len(results)): + auto_incrementing_value = results[i][1] + assert auto_incrementing_value == i+1 + finally: + try: + self.client.delete_table(table_name) + except: + pass + def test_insert_nonexistent_field(self): table = self.client.table(self.ex_table) op = table.new_insert() @@ -300,6 +341,57 @@ class TestClient(KuduTestBase, unittest.TestCase): scanner = table.scanner().open() assert len(scanner.read_all_tuples()) == 0 + def test_insert_with_auto_incrementing_column(self): + + table_name = 'test_insert_with_auto_incrementing_column' + try: + builder = kudu.schema_builder() + (builder.add_column('key', kudu.int32) + .nullable(False) + .non_unique_primary_key()) + builder.add_column('data', kudu.string) + schema = builder.build() + + self.client.create_table( + table_name, schema, + partitioning=Partitioning().add_hash_partitions(['key'], 2)) + + table = self.client.table(table_name) + session = self.client.new_session() + + # Insert with auto incrementing column specified + op = table.new_insert() + op['key'] = 1 + op[Schema.get_auto_incrementing_column_name()] = 1 + session.apply(op) + try: + session.flush() + except KuduBadStatus: + message = 'is incorrectly set' + errors, overflow = session.get_pending_errors() + assert not overflow + assert len(errors) == 1 + assert message in repr(errors[0]) + + # TODO: Upsert should be rejected as of now. However the test segfaults: KUDU-3454 + # TODO: Upsert ignore should be rejected. Once Python client supports upsert ignore. + + # With non-unique primary key, one can't use the tuple/list initialization for new + # inserts. In this case, at the second position it would like to get an int64 (the type + # of the auto-incrementing counter), therefore we get type error. (Specifying the + # auto-incremeintg counter is obviously rejected from the server side) + with self.assertRaises(TypeError): + op = table.new_insert((1,'text')) + + with self.assertRaises(TypeError): + op = table.new_insert([1,'text']) + + finally: + try: + self.client.delete_table(table_name) + except: + pass + def test_failed_write_op(self): # Insert row table = self.client.table(self.ex_table) @@ -349,7 +441,6 @@ class TestClient(KuduTestBase, unittest.TestCase): with self.assertRaises(ValueError): self.client.new_session(flush_mode='foo') - def test_session_mutation_buffer_settings(self): self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND, mutation_buffer_sz= 10*1024*1024, @@ -607,6 +698,65 @@ class TestClient(KuduTestBase, unittest.TestCase): except: pass + def test_alter_table_auto_incrementing_column(self): + table_name = 'alter_table_with_auto_incrementing_column' + try: + builder = kudu.schema_builder() + (builder.add_column('key', kudu.int32) + .nullable(False) + .non_unique_primary_key()) + schema = builder.build() + + self.client.create_table( + table_name, schema, + partitioning=Partitioning().add_hash_partitions(['key'], 2)) + + col_name = Schema.get_auto_incrementing_column_name() + table = self.client.table(table_name) + + # negatives + alterer = self.client.new_table_alterer(table) + alterer.drop_column(col_name) + with self.assertRaises(KuduInvalidArgument): + alterer.alter() + + alterer = self.client.new_table_alterer(table) + alterer.add_column(col_name) + with self.assertRaises(KuduInvalidArgument): + alterer.alter() + + alterer = self.client.new_table_alterer(table) + alterer.alter_column(col_name, "new_column_name") + with self.assertRaises(KuduInvalidArgument): + alterer.alter() + + alterer = self.client.new_table_alterer(table) + alterer.alter_column(col_name).remove_default() + with self.assertRaises(KuduInvalidArgument): + alterer.alter() + + # positives + alterer = self.client.new_table_alterer(table) + alterer.alter_column(col_name).block_size(1) + alterer.alter() + + alterer = self.client.new_table_alterer(table) + alterer.alter_column(col_name).encoding('plain') + alterer.alter() + + alterer = self.client.new_table_alterer(table) + alterer.alter_column(col_name).compression('none') + alterer.alter() + + # TODO(martongreber): once column comments are added to the python client + # check whether the auto-incrementing column's comment can be altered. + + finally: + try: + self.client.delete_table(table_name) + except: + pass + def test_require_encryption(self): client = kudu.connect(self.master_hosts, self.master_ports, encryption_policy=ENCRYPTION_REQUIRED) diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py index 0c1fda608..9214adb98 100644 --- a/python/kudu/tests/test_scantoken.py +++ b/python/kudu/tests/test_scantoken.py @@ -297,3 +297,64 @@ class TestScanToken(TestScanBase): self.assertEqual(sorted(tuples), sorted(self.tuples)) + + def test_scan_token_auto_incrementing_column(self): + table_name = 'scan_auto_incrementing_column' + builder = kudu.schema_builder() + (builder.add_column('key', kudu.string) + .nullable(False) + .non_unique_primary_key()) + schema = builder.build() + + self.client.create_table( + table_name, schema, + partitioning=kudu.client.Partitioning().add_hash_partitions(['key'], 2)) + table = self.client.table(table_name) + + # No projection testing default behaviour + builder = table.scan_token_builder() + tokens = builder.build() + for token in tokens: + scanner = self.client.deserialize_token_into_scanner(token.serialize()) + schema = scanner.get_projection_schema() + assert len(schema.names) == 2 + assert kudu.Schema.get_auto_incrementing_column_name() in schema.names + + # Empty projection + builder = table.scan_token_builder() + builder.set_projected_column_names([]) + tokens = builder.build() + for token in tokens: + scanner = self.client.deserialize_token_into_scanner(token.serialize()) + schema = scanner.get_projection_schema() + assert len(schema.names) == 0 + + # Projection with only the auto-incrementing column + builder = table.scan_token_builder() + builder.set_projected_column_names([kudu.Schema.get_auto_incrementing_column_name()]) + tokens = builder.build() + for token in tokens: + scanner = self.client.deserialize_token_into_scanner(token.serialize()) + schema = scanner.get_projection_schema() + assert len(schema.names) == 1 + assert kudu.Schema.get_auto_incrementing_column_name() in schema.names + + # Projection including the auto-incrementing column + builder = table.scan_token_builder() + builder.set_projected_column_names(['key', kudu.Schema.get_auto_incrementing_column_name()]) + tokens = builder.build() + for token in tokens: + scanner = self.client.deserialize_token_into_scanner(token.serialize()) + schema = scanner.get_projection_schema() + assert len(schema.names) == 2 + assert kudu.Schema.get_auto_incrementing_column_name() in schema.names + + # Projection excluding the auto-incrementing column + builder = table.scan_token_builder() + builder.set_projected_column_names(['key']) + tokens = builder.build() + for token in tokens: + scanner = self.client.deserialize_token_into_scanner(token.serialize()) + schema = scanner.get_projection_schema() + assert len(schema.names) == 1 + assert not kudu.Schema.get_auto_incrementing_column_name() in schema.names \ No newline at end of file diff --git a/python/kudu/tests/test_schema.py b/python/kudu/tests/test_schema.py index dca9ab566..42a5d5aa3 100644 --- a/python/kudu/tests/test_schema.py +++ b/python/kudu/tests/test_schema.py @@ -19,8 +19,10 @@ from __future__ import division from kudu.compat import unittest +from kudu.errors import KuduInvalidArgument import kudu +from kudu.schema import Schema class TestSchema(unittest.TestCase): @@ -289,6 +291,196 @@ class TestSchema(unittest.TestCase): assert schema[3].nullable assert not schema[4].nullable + def test_auto_incrementing_column_name(self): + name = Schema.get_auto_incrementing_column_name() + assert isinstance(name, str) + assert len(name) > 0 + + def test_non_unique_primary_key(self): + builder = kudu.schema_builder() + (builder.add_column('key', 'int64') + .nullable(False) + .non_unique_primary_key()) + builder.add_column('data1', 'double') + schema = builder.build() + assert len(schema) == 3 + assert len(schema.primary_keys()) == 2 + assert Schema.get_auto_incrementing_column_name() in schema.primary_keys() + + def test_set_non_unique_primary_keys(self): + builder = kudu.schema_builder() + (builder.add_column('key', 'int64') + .nullable(False)) + builder.add_column('data1', 'double') + builder.set_non_unique_primary_keys(['key']) + schema = builder.build() + assert len(schema) == 3 + assert len(schema.primary_keys()) == 2 + assert Schema.get_auto_incrementing_column_name() in schema.primary_keys() + + def test_set_non_unique_primary_keys_wrong_order(self): + builder = kudu.schema_builder() + builder.add_column('key1', 'int64').nullable(False) + builder.add_column('key2', 'double').nullable(False) + builder.set_non_unique_primary_keys(['key2', 'key1']) + with self.assertRaises(KuduInvalidArgument): + schema = builder.build() + + def test_set_non_unique_primary_keys_not_first(self): + builder = kudu.schema_builder() + builder.add_column('data1', 'double') + (builder.add_column('key', 'int64') + .nullable(False)) + builder.set_non_unique_primary_keys(['key']) + with self.assertRaises(KuduInvalidArgument): + schema = builder.build() + + def test_set_non_unique_primary_keys_same_name_twice(self): + builder = kudu.schema_builder() + (builder.add_column('key', 'int64') + .nullable(False)) + builder.add_column('data1', 'double') + builder.set_non_unique_primary_keys(['key', 'key']) + with self.assertRaises(KuduInvalidArgument): + schema = builder.build() + + def test_unique_and_non_unique_primary_key_on_same_column(self): + builder = kudu.schema_builder() + (builder.add_column('key', 'int64') + .nullable(False) + .primary_key() + .non_unique_primary_key()) + builder.add_column('data1', 'double') + schema = builder.build() + assert len(schema) == 3 + assert len(schema.primary_keys()) == 2 + assert Schema.get_auto_incrementing_column_name() in schema.primary_keys() + + def test_non_unique_and_unique_primary_key_on_same_column(self): + builder = kudu.schema_builder() + (builder.add_column('key', 'int64') + .nullable(False) + .non_unique_primary_key() + .primary_key()) + builder.add_column('data1', 'double') + schema = builder.build() + assert len(schema) == 2 + assert len(schema.primary_keys()) == 1 + assert Schema.get_auto_incrementing_column_name() not in schema.primary_keys() + + def test_non_unique_primary_key_not_first(self): + builder = kudu.schema_builder() + builder.add_column('data1', 'int64') + (builder.add_column('key', 'double') + .nullable(False) + .non_unique_primary_key()) + with self.assertRaises(KuduInvalidArgument): + builder.build() + + def test_unique_and_non_unique_primary_key_on_different_cols(self): + builder = kudu.schema_builder() + (builder.add_column('key1', 'double') + .nullable(False) + .primary_key()) + (builder.add_column('key2', 'double') + .nullable(False) + .non_unique_primary_key()) + with self.assertRaises(KuduInvalidArgument): + builder.build() + + def test_non_unique_and_unique_primary_key_on_different_cols(self): + builder = kudu.schema_builder() + (builder.add_column('key1', 'double') + .nullable(False) + .non_unique_primary_key()) + (builder.add_column('key2', 'double') + .nullable(False) + .primary_key()) + with self.assertRaises(KuduInvalidArgument): + builder.build() + + def test_multiple_non_unique_primary_keys(self): + builder = kudu.schema_builder() + (builder.add_column('key1', 'double') + .nullable(False) + .non_unique_primary_key()) + (builder.add_column('key2', 'double') + .nullable(False) + .non_unique_primary_key()) + with self.assertRaises(KuduInvalidArgument): + builder.build() + + def test_non_unique_primary_key_and_set_non_unique_primary_keys(self): + builder = kudu.schema_builder() + (builder.add_column('key', 'int64') + .nullable(False) + .non_unique_primary_key()) + builder.add_column('data1', 'double') + builder.set_non_unique_primary_keys(['key']) + with self.assertRaises(KuduInvalidArgument): + builder.build() + + def test_primary_key_and_set_non_unique_primary_keys(self): + builder = kudu.schema_builder() + (builder.add_column('key', 'int64') + .nullable(False) + .primary_key()) + builder.add_column('data1', 'double') + builder.set_non_unique_primary_keys(['key']) + with self.assertRaises(KuduInvalidArgument): + builder.build() + + def test_primary_key_and_set_primary_keys(self): + builder = kudu.schema_builder() + (builder.add_column('key', 'int64') + .nullable(False) + .primary_key()) + builder.add_column('data1', 'double') + builder.set_primary_keys(['key']) + with self.assertRaises(KuduInvalidArgument): + builder.build() + + def test_non_unique_primary_key_and_set_primary_keys(self): + builder = kudu.schema_builder() + (builder.add_column('key', 'int64') + .nullable(False) + .non_unique_primary_key()) + builder.add_column('data1', 'double') + builder.set_primary_keys(['key']) + with self.assertRaises(KuduInvalidArgument): + builder.build() + + def test_set_non_unique_and_set_unique_primary_key(self): + builder = kudu.schema_builder() + builder.add_column('key1', 'int64').nullable(False) + builder.add_column('key2', 'double').nullable(False) + builder.set_non_unique_primary_keys(['key1', 'key2']) + builder.set_primary_keys(['key1', 'key2']) + schema = builder.build() + assert len(schema) == 2 + assert len(schema.primary_keys()) == 2 + assert Schema.get_auto_incrementing_column_name() not in schema.primary_keys() + + def test_set_unique_and_set_non_unique_primary_key(self): + builder = kudu.schema_builder() + builder.add_column('key1', 'int64').nullable(False) + builder.add_column('key2', 'double').nullable(False) + builder.set_primary_keys(['key1', 'key2']) + builder.set_non_unique_primary_keys(['key1', 'key2']) + schema = builder.build() + assert len(schema) == 3 + assert len(schema.primary_keys()) == 3 + assert Schema.get_auto_incrementing_column_name() in schema.primary_keys() + + def test_reserved_column_name(self): + builder = kudu.schema_builder() + (builder.add_column('key', 'int64') + .nullable(False) + .primary_key()) + builder.add_column(Schema.get_auto_incrementing_column_name(), 'double') + with self.assertRaises(KuduInvalidArgument): + builder.build() + def test_default_value(self): pass