Repository: kudu Updated Branches: refs/heads/master 910fd0bac -> 7177fc359
[python] KUDU-2441: Enable configuration of mutation buffer The Python client currently doesn't support configuring the mutation buffer for Session objects. This patch brings the Python client up to speed with the C++ client in this respect. This patch includes a basic unit test. Change-Id: I52ac48e7dddc31e666a95ace4c7672da51d80b11 Reviewed-on: http://gerrit.cloudera.org:8080/10674 Tested-by: Kudu Jenkins Reviewed-by: Will Berkeley <wdberke...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7177fc35 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7177fc35 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7177fc35 Branch: refs/heads/master Commit: 7177fc35992daabdb93d9920fb1cc46c1a07619a Parents: 910fd0b Author: Jordan Birdsell <jbirds...@phdata.io> Authored: Sat Jun 9 17:06:07 2018 -0400 Committer: Jordan Birdsell <jtbirds...@apache.org> Committed: Fri Jul 6 15:30:45 2018 +0000 ---------------------------------------------------------------------- python/kudu/client.pyx | 121 +++++++++++++++++++++++++++++++++- python/kudu/libkudu_client.pxd | 6 +- python/kudu/tests/test_client.py | 29 ++++++++ 3 files changed, 154 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/7177fc35/python/kudu/client.pyx ---------------------------------------------------------------------- diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx index afa7e8d..39093b1 100644 --- a/python/kudu/client.pyx +++ b/python/kudu/client.pyx @@ -525,7 +525,7 @@ cdef class Client: result.append(ts._init(tservers[i])) return result - def new_session(self, flush_mode='manual', timeout_ms=5000): + def new_session(self, flush_mode='manual', timeout_ms=5000, **kwargs): """ Create a new KuduSession for applying write operations. @@ -535,17 +535,43 @@ cdef class Client: See Session.set_flush_mode timeout_ms : int, default 5000 Timeout in milliseconds + mutation_buffer_sz : Size in bytes of the buffer space. + mutation_buffer_watermark : Watermark level as percentage of the mutation buffer size, + this is used to trigger a flush in AUTO_FLUSH_BACKGROUND mode. + mutation_buffer_flush_interval : The duration of the interval for the time-based + flushing, in milliseconds. In some cases, while running in AUTO_FLUSH_BACKGROUND + mode, the size of the mutation buffer for pending operations and the flush + watermark for fresh operations may be too high for the rate of incoming data: + it would take too long to accumulate enough data in the buffer to trigger + flushing. I.e., it makes sense to flush the accumulated operations if the + prior flush happened long time ago. This parameter sets the wait interval for + the time-based flushing which takes place along with the flushing triggered + by the over-the-watermark criterion. By default, the interval is set to + 1000 ms (i.e. 1 second). + mutation_buffer_max_num : The maximum number of mutation buffers per KuduSession + object to hold the applied operations. Use 0 to set the maximum number of + concurrent mutation buffers to unlimited Returns ------- session : kudu.Session """ + cdef Session result = Session() result.s = self.cp.NewSession() result.set_flush_mode(flush_mode) result.set_timeout_ms(timeout_ms) + if "mutation_buffer_sz" in kwargs: + result.set_mutation_buffer_space(kwargs["mutation_buffer_sz"]) + if "mutation_buffer_watermark" in kwargs: + result.set_mutation_buffer_flush_watermark(kwargs["mutation_buffer_watermark"]) + if "mutation_buffer_flush_interval" in kwargs: + result.set_mutation_buffer_flush_interval(kwargs["mutation_buffer_flush_interval"]) + if "mutation_buffer_max_num" in kwargs: + result.set_mutation_buffer_max_num(kwargs["mutation_buffer_max_num"]) + return result def new_table_alterer(self, Table table): @@ -1245,6 +1271,99 @@ cdef class Session: """ self.s.get().SetTimeoutMillis(ms) + def set_mutation_buffer_space(self, size_t size_bytes): + """ + Set the amount of buffer space used by this session for outbound writes. + + The effect of the buffer size varies based on the flush mode of the session: + + AUTO_FLUSH_SYNC: since no buffering is done, this has no effect. + AUTO_FLUSH_BACKGROUND: if the buffer space is exhausted, then write calls + will block until there is space available in the buffer. + MANUAL_FLUSH: if the buffer space is exhausted, then write calls will return + an error + + By default, the buffer space is set to 7 MiB (i.e. 7 * 1024 * 1024 bytes). + + Parameters + ---------- + size_bytes : Size of the buffer space to set (number of bytes) + """ + status = self.s.get().SetMutationBufferSpace(size_bytes) + + check_status(status) + + def set_mutation_buffer_flush_watermark(self, double watermark_pct): + """ + Set the buffer watermark to trigger flush in AUTO_FLUSH_BACKGROUND mode. + + This method sets the watermark for fresh operations in the buffer when running + in AUTO_FLUSH_BACKGROUND mode: once the specified threshold is reached, the + session starts sending the accumulated write operations to the appropriate + tablet servers. The flush watermark determines how much of the buffer space is + taken by newly submitted operations. Setting this level to 100% results in + flushing the buffer only when the newly applied operation would overflow the + buffer. By default, the buffer flush watermark is set to 50%. + + Parameters + ---------- + watermark_pct : Watermark level as percentage of the mutation buffer size + """ + status = self.s.get().SetMutationBufferFlushWatermark(watermark_pct) + + check_status(status) + + def set_mutation_buffer_flush_interval(self, unsigned int millis): + """ + Set the interval for time-based flushing of the mutation buffer. + + In some cases, while running in AUTO_FLUSH_BACKGROUND mode, the size of the + mutation buffer for pending operations and the flush watermark for fresh + operations may be too high for the rate of incoming data: it would take too + long to accumulate enough data in the buffer to trigger flushing. I.e., it + makes sense to flush the accumulated operations if the prior flush happened + long time ago. This method sets the wait interval for the time-based flushing + which takes place along with the flushing triggered by the over-the-watermark + criterion. By default, the interval is set to 1000 ms (i.e. 1 second). + + Parameters + ---------- + millis : The duration of the interval for the time-based flushing, in milliseconds. + """ + + status = self.s.get().SetMutationBufferFlushInterval(millis) + + check_status(status) + + def set_mutation_buffer_max_num(self, unsigned int max_num): + """ + Set the maximum number of mutation buffers per Session object. + + A Session accumulates write operations submitted via the Apply() method in + mutation buffers. A Session always has at least one mutation buffer. The + mutation buffer which accumulates new incoming operations is called the current + mutation buffer. The current mutation buffer is flushed using the + Session.flush() method or it's done by the Session automatically if + running in AUTO_FLUSH_BACKGROUND mode. After flushing the current mutation buffer, + a new buffer is created upon calling Session.apply(), provided the limit is + not exceeded. A call to Session.apply() blocks if it's at the maximum number + of buffers allowed; the call unblocks as soon as one of the pending batchers + finished flushing and a new batcher can be created. + + The minimum setting for this parameter is 1 (one). The default setting for this + parameter is 2 (two). + + Parameters + ---------- + max_num : The maximum number of mutation buffers per Session object to hold + the applied operations. Use 0 to set the maximum number of concurrent mutation + buffers to unlimited. + """ + + status = self.s.get().SetMutationBufferMaxNum(max_num) + + check_status(status) + def apply(self, WriteOperation op): """ Apply the indicated write operation http://git-wip-us.apache.org/repos/asf/kudu/blob/7177fc35/python/kudu/libkudu_client.pxd ---------------------------------------------------------------------- diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd index 5cd7731..a34d6c2 100644 --- a/python/kudu/libkudu_client.pxd +++ b/python/kudu/libkudu_client.pxd @@ -624,7 +624,11 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: Status SetFlushMode(FlushMode m) - void SetMutationBufferSpace(size_t size) + Status SetMutationBufferSpace(size_t size) + Status SetMutationBufferFlushWatermark(double watermark_pct) + Status SetMutationBufferFlushInterval(unsigned int millis) + Status SetMutationBufferMaxNum(unsigned int max_num) + void SetTimeoutMillis(int millis) void SetPriority(int priority) http://git-wip-us.apache.org/repos/asf/kudu/blob/7177fc35/python/kudu/tests/test_client.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py index 0a3d47e..f251033 100644 --- a/python/kudu/tests/test_client.py +++ b/python/kudu/tests/test_client.py @@ -255,6 +255,35 @@ class TestClient(KuduTestBase, unittest.TestCase): with self.assertRaises(ValueError): self.client.new_session(flush_mode='foo') + + def test_session_mutation_buffer_settings(self): + self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND, + mutation_buffer_sz= 10*1024*1024, + mutation_buffer_watermark=0.5, + mutation_buffer_flush_interval=2000, + mutation_buffer_max_num=3) + + session = self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND) + session.set_mutation_buffer_space(10*1024*1024) + session.set_mutation_buffer_flush_watermark(0.5) + session.set_mutation_buffer_flush_interval(2000) + session.set_mutation_buffer_max_num(3) + + def test_session_mutation_buffer_errors(self): + session = self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND) + + with self.assertRaises(OverflowError): + session.set_mutation_buffer_max_num(-1) + + with self.assertRaises(kudu.errors.KuduInvalidArgument): + session.set_mutation_buffer_flush_watermark(1.2) + + with self.assertRaises(OverflowError): + session.set_mutation_buffer_flush_interval(-1) + + with self.assertRaises(OverflowError): + session.set_mutation_buffer_space(-1) + def test_connect_timeouts(self): # it works! any other way to check kudu.connect(self.master_hosts, self.master_ports,