LENS-1202: Add client side iterator for result in python client

Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/acb32d54
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/acb32d54
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/acb32d54

Branch: refs/heads/master
Commit: acb32d5400a61037c575dae44b31a884ff6773a7
Parents: 3780744
Author: Rajat Khandelwal <pro...@apache.org>
Authored: Mon Jul 25 15:02:17 2016 +0530
Committer: Rajat Khandelwal <rajatgupt...@gmail.com>
Committed: Mon Jul 25 15:02:17 2016 +0530

----------------------------------------------------------------------
 contrib/clients/python/README.md               |  67 ++++++-
 contrib/clients/python/lens/client/main.py     |  23 +--
 contrib/clients/python/lens/client/models.py   |   6 -
 contrib/clients/python/lens/client/query.py    | 196 ++++++++++++++++++--
 contrib/clients/python/lens/client/session.py  |  50 +++++
 contrib/clients/python/test/test_lensclient.py |  78 +++++---
 6 files changed, 343 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/README.md
----------------------------------------------------------------------
diff --git a/contrib/clients/python/README.md b/contrib/clients/python/README.md
index 00525ff..93a19c9 100644
--- a/contrib/clients/python/README.md
+++ b/contrib/clients/python/README.md
@@ -8,13 +8,60 @@ You can install like this:
 
 
 ## Usage
-    from lens.client import LensClient
-    with LensClient("http://lens.server.url/";, "user.name", database="db") as 
client:
-        handle = client.queries.submit("cube select ...", query_name="My first 
query")
-        # Optionally wait for completion.
-        while not client.queries[handle].finished:
-            time.sleep(20) # sleep 20 seconds
-        print client.queries[handle].result_set_path
-        # listing queries:
-        for handle in client.queries(state='RUNNING'):
-            print client.queries[handle]
+
+### Listing queries
+```python
+with LensClient("http://lens.server.url/";, "user.name", database="db") as 
client:
+    for handle in client.queries(state='RUNNING'):
+        print client.queries[handle]
+```
+
+### Async submission
+``` python
+from lens.client import LensClient
+with LensClient("http://lens.server.url/";, "user.name", database="db") as 
client:
+    # Submit asynchronously
+    handle = client.queries.submit("cube select ...", query_name="My first 
query")
+    # You can wait for completion and get the entire query detail:
+    query = client.queries.wait_till_finish(handle, poll_interval=5) # poll 
each 5 seconds
+    # the path would be accessible from lens server machine. Not much useful 
for the client
+    print query.result_set_path
+    # iterating over result:
+    for row in query.result:
+        print row
+    # Writing result to local csv:
+    with open('result.csv', 'w') as csvfile:
+        writer = csv.writer(csvfile)
+        for row in query.result: # This will fetch the result again from lens 
server
+            writer.writerow(row)
+    # listing queries:
+    for handle in client.queries(state='RUNNING'):
+        print client.queries[handle]
+```
+
+### Sync submission
+```python
+from lens.client import LensClient
+with LensClient("http://lens.server.url/";, "user.name", database="db") as 
client:
+    # Half async: The http call will return in 10 seconds, post that, query 
would be cancelled (depending on the server's configurations)
+    query = client.queries.submit("cube select ...", query_name="My first 
query", timeout=10) # 10 seconds
+    if query.status.status != 'CANCELLED':
+        result = query.result
+```
+
+### Async submission with wait
+```python
+from lens.client import LensClient
+with LensClient("http://lens.server.url/";, "user.name", database="db") as 
client:
+    # Pseudo-sync
+    query = client.queries.submit("cube select ...", query_name="My first 
query", wait=True, poll_interval=5) # submit async and wait till finish, 
polling every 5 seconds. poll_interval is optional
+    query.result
+```
+
+### Fetching just results
+```python
+from lens.client import LensClient
+with LensClient("http://lens.server.url/";, "user.name", database="db") as 
client:
+    # Direct result. Query handle and other details will be lost. 
+    result = client.queries.submit("cube select ...", query_name="My first 
query", fetch_result=True, poll_interval=5, delimiter=",", custom_mappings={})
+```

http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/lens/client/main.py
----------------------------------------------------------------------
diff --git a/contrib/clients/python/lens/client/main.py 
b/contrib/clients/python/lens/client/main.py
index dc60497..bf5d81e 100644
--- a/contrib/clients/python/lens/client/main.py
+++ b/contrib/clients/python/lens/client/main.py
@@ -16,11 +16,11 @@
 #
 import os
 
-import requests
 from six import string_types
 from .log import LensLogClient
+from .session import LensSessionClient
 from .query import LensQueryClient
-from .utils import conf_to_xml, xml_file_to_conf
+from .utils import xml_file_to_conf
 
 
 class LensClient(object):
@@ -37,25 +37,12 @@ class LensClient(object):
             self.base_url += "/"
         username = username or conf.get('lens.client.user.name', "anonymous")
         database = database or conf.get('lens.client.dbname')
-        self.open_session(username, password, database, conf)
-        self.queries = LensQueryClient(self.base_url, self._sessionid)
+        self.session = LensSessionClient(self.base_url, username, password, 
database, conf)
+        self.queries = LensQueryClient(self.base_url, self.session)
         self.logs = LensLogClient(self.base_url)
 
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
-        self.close_session()
-
-    def close_session(self):
-        if self._sessionid:
-            requests.delete(self.base_url + "session/", params={'sessionid': 
self._sessionid})
-            self._sessionid = None
-
-    def open_session(self, username, password, database, conf):
-        payload = [('username', username), ('password', password), 
('sessionconf', conf_to_xml(conf))]
-        if database:
-            payload.append(('database', database))
-        r = requests.post(self.base_url + "session/", files=payload, 
headers={'accept': 'application/xml'})
-        r.raise_for_status()
-        self._sessionid = r.text
+        self.session.close()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/lens/client/models.py
----------------------------------------------------------------------
diff --git a/contrib/clients/python/lens/client/models.py 
b/contrib/clients/python/lens/client/models.py
index 7451fe9..1860540 100644
--- a/contrib/clients/python/lens/client/models.py
+++ b/contrib/clients/python/lens/client/models.py
@@ -56,9 +56,3 @@ class WrappedJson(dict):
     def __eq__(self, other):
         return super(WrappedJson, self).__eq__(other) or (
         self._is_wrapper and other._is_wrapper and str(self) == str(other))
-
-
-class LensQuery(WrappedJson):
-    @property
-    def finished(self):
-        return self.status.status in ('SUCCESSFUL', 'FAILED', 'CANCELED', 
'CLOSED')

http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/lens/client/query.py
----------------------------------------------------------------------
diff --git a/contrib/clients/python/lens/client/query.py 
b/contrib/clients/python/lens/client/query.py
index 88ce719..f82f0cb 100644
--- a/contrib/clients/python/lens/client/query.py
+++ b/contrib/clients/python/lens/client/query.py
@@ -14,23 +14,138 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import codecs
+import time
+import zipfile
 
 import requests
-import time
-from six import string_types
-from .models import WrappedJson, LensQuery
+from six import string_types, BytesIO, StringIO, PY2, PY3
+from .models import WrappedJson
 from .utils import conf_to_xml
+import csv
+
+long_type = int
+
+if PY3:
+    from collections.abc import Iterable as Iterable
+elif PY2:
+    from collections import Iterable as Iterable
+    long_type = long
+
+
+class LensQuery(WrappedJson):
+    def __init__(self, client, *args, **kwargs):
+        super(LensQuery, self).__init__(*args, **kwargs)
+        self.client = client
+
+    @property
+    def finished(self):
+        return self.status.status in ('SUCCESSFUL', 'FAILED', 'CANCELED', 
'CLOSED')
+
+    def get_result(self, *args, **kwargs):
+        return self.client.get_result(self, *args, **kwargs)
+
+    result = property(get_result)
+
+
+type_mappings = {'BOOLEAN': bool,
+                 'TINYINT': int,
+                 'SMALLINT': int,
+                 'INT': int,
+                 'BIGINT': long_type,
+                 'FLOAT': float,
+                 'DOUBLE': float,
+                 'TIMESTAMP': long_type,
+                 'BINARY': bin,
+                 'ARRAY': list,
+                 'MAP': dict,
+                 # 'STRUCT,': str,
+                 # 'UNIONTYPE,': float,
+                 # 3'USER_DEFINED,': float,
+                 'DECIMAL,': float,
+                 # 'NULL,': float,
+                 # 'DATE,': float,
+                 # 'VARCHAR,': float,
+                 # 'CHAR': float
+                 }
+default_mapping = lambda x: x
+
+class LensQueryResult(Iterable):
+    def __init__(self, custom_mappings=None):
+        if custom_mappings is None:
+            custom_mappings = {}
+        self.custom_mappings = custom_mappings
+
+    def _mapping(self, type_name):
+        if type_name in self.custom_mappings:
+            return self.custom_mappings[type_name]
+        if type_name in type_mappings:
+            return type_mappings[type_name]
+        return default_mapping
+
+
+class LensInMemoryResult(LensQueryResult):
+    def __init__(self, resp, custom_mappings=None):
+        super(LensInMemoryResult, self).__init__(custom_mappings)
+        self.rows = resp.in_memory_query_result.rows
+
+    def __iter__(self):
+        for row in self.rows:
+            yield list(self._mapping(value.type)(value.value) if value else 
None for value in row['values'])
+
+class LensPersistentResult(LensQueryResult):
+    def __init__(self, header, response, encoding=None, 
is_header_present=True, delimiter=",",
+                 custom_mappings=None):
+        super(LensPersistentResult, self).__init__(custom_mappings)
+        self.response = response
+        self.is_zipped = 'zip' in self.response.headers['content-disposition']
+        self.delimiter = str(delimiter)
+        self.is_header_present = is_header_present
+        self.encoding = encoding
+        self.header = header
+
+    def _parse_line(self, line):
+        return 
list(self._mapping(self.header.columns[index].type)(line[index]) for index in 
range(len(line)))
+
+    def __iter__(self):
+        if self.is_zipped:
+            byte_stream = BytesIO(self.response.content)
+            with zipfile.ZipFile(byte_stream) as self.zipfile:
+                for name in self.zipfile.namelist():
+                    with self.zipfile.open(name) as single_file:
+                        if name[-3:] == 'csv':
+                            reader = csv.reader(single_file, 
delimiter=self.delimiter)
+                        else:
+                            reader = single_file
+                        reader_iterator = iter(reader)
+                        if self.is_header_present:
+                            next(reader_iterator)
+                        for line in reader_iterator:
+                            yield self._parse_line(line)
+            byte_stream.close()
+        else:
+            stream = codecs.iterdecode(self.response.iter_lines(),
+                                       self.response.encoding or 
self.response.apparent_encoding)
+            reader = csv.reader(stream, delimiter=self.delimiter)
+            reader_iterator = iter(reader)
+            if self.is_header_present:
+                next(reader_iterator)
+            for line in reader_iterator:
+                yield self._parse_line(line)
+            stream.close()
 
 
 class LensQueryClient(object):
-    def __init__(self, base_url, sessionid):
-        self._sessionid = sessionid
+    def __init__(self, base_url, session):
+        self._session = session
         self.base_url = base_url + "queryapi/"
         self.launched_queries = []
         self.finished_queries = {}
+        self.query_confs = {}
+        self.is_header_present_in_result = 
self._session['lens.query.output.write.header'].lower() in ['true', '1', 't', 
'y', 'yes', 'yeah', 'yup']
 
     def __call__(self, **filters):
-        filters['sessionid'] = self._sessionid
+        filters['sessionid'] = self._session._sessionid
         resp = requests.get(self.base_url + "queries/", params=filters, 
headers={'accept': 'application/json'})
         return self.sanitize_response(resp)
 
@@ -38,34 +153,79 @@ class LensQueryClient(object):
         if isinstance(item, string_types):
             if item in self.finished_queries:
                 return self.finished_queries[item]
-            resp = requests.get(self.base_url + "queries/" + item, 
params={'sessionid': self._sessionid},
-                                           headers={'accept': 
'application/json'})
+            resp = requests.get(self.base_url + "queries/" + item, 
params={'sessionid': self._session._sessionid},
+                                headers={'accept': 'application/json'})
             resp.raise_for_status()
-            query = LensQuery(resp.json(object_hook=WrappedJson))
+            query = LensQuery(self, resp.json(object_hook=WrappedJson))
             if query.finished:
+                query.client = self
                 self.finished_queries[item] = query
             return query
+        elif isinstance(item, LensQuery):
+            return self[item.query_handle]
         elif isinstance(item, WrappedJson):
             if item._is_wrapper:
                 return self[item._wrapped_value]
+            if item.query_handle:
+                return self[item.query_handle]
         raise Exception("Can't get query: " + str(item))
 
-    def submit(self, query, operation="execute", query_name=None, 
timeout=None, conf=None):
-        payload = [('sessionid', self._sessionid), ('query', query), 
('operation', operation)]
+    def submit(self, query, operation=None, query_name=None, timeout=None, 
conf=None, wait=False, fetch_result=False,
+               *args, **kwargs):
+        payload = [('sessionid', self._session._sessionid), ('query', query)]
         if query_name:
             payload.append(('queryName', query_name))
         if timeout:
-            payload.append(('timeoutmillis', timeout))
+            payload.append(('timeoutmillis', str(int(timeout) * 1000)))
+        if not operation:
+            operation = "execute_with_timeout" if timeout else "execute"
+        payload.append(('operation', operation))
         payload.append(('conf', conf_to_xml(conf)))
         resp = requests.post(self.base_url + "queries/", files=payload, 
headers={'accept': 'application/json'})
         query = self.sanitize_response(resp)
-        self.launched_queries.append(query)
+        if conf:
+            self.query_confs[str(query)] = conf
+        if fetch_result:
+            # get result and return
+            return self.get_result(query, *args, **kwargs)  # query is handle 
here
+        elif wait:
+            # fetch details and return
+            return self.wait_till_finish(query, *args, **kwargs)
+        # just return handle. This would be the async case. Or execute with 
timeout, without wait
         return query
 
-    def wait_till_finish(self, handle):
-        while not self[handle].finished:
-            time.sleep(1)
-        return self[handle]
+    def wait_till_finish(self, handle_or_query, poll_interval=5, *args, 
**kwargs):
+        while not self[handle_or_query].finished:
+            time.sleep(poll_interval)
+        return self[handle_or_query]
+
+    def get_result(self, handle_or_query, *args, **kwargs):
+        query = self.wait_till_finish(handle_or_query, *args, **kwargs)
+        handle = str(query.query_handle)
+        if query.status.status == 'SUCCESSFUL' and 
query.status.is_result_set_available:
+            resp = requests.get(self.base_url + "queries/" + handle + 
"/resultsetmetadata",
+                                params={'sessionid': 
self._session._sessionid}, headers={'accept': 'application/json'})
+            metadata = self.sanitize_response(resp)
+            # Try getting the result through http result
+            resp = requests.get(self.base_url + "queries/" + handle + 
"/httpresultset",
+                                params={'sessionid': 
self._session._sessionid}, stream=True)
+            if resp.ok:
+                is_header_present = self.is_header_present_in_result
+                if handle in self.query_confs and 
'lens.query.output.write.header' in self.query_confs[handle]:
+                    is_header_present = 
bool(self.query_confs[handle]['lens.query.output.write.header'])
+                return LensPersistentResult(metadata, resp, 
is_header_present=is_header_present, *args, **kwargs)
+            else:
+                response = requests.get(self.base_url + "queries/" + handle + 
"/resultset",
+                                    params={'sessionid': 
self._session._sessionid}, headers={'accept': 'application/json'})
+                resp = self.sanitize_response(response)
+                # If it has in memory result, return inmemory result iterator
+                if resp._is_wrapper and resp._wrapped_key == 
u'inMemoryQueryResult':
+                    return LensInMemoryResult(resp)
+                # Else, return whatever you got
+                return resp
+
+        else:
+            raise Exception("Result set not available")
 
     def sanitize_response(self, resp):
         resp.raise_for_status()
@@ -85,5 +245,3 @@ class LensQueryClient(object):
         except:
             resp_json = resp.json()
         return resp_json
-
-

http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/lens/client/session.py
----------------------------------------------------------------------
diff --git a/contrib/clients/python/lens/client/session.py 
b/contrib/clients/python/lens/client/session.py
new file mode 100644
index 0000000..a1ccc4b
--- /dev/null
+++ b/contrib/clients/python/lens/client/session.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import requests
+
+from .models import WrappedJson
+from .utils import conf_to_xml
+
+
+class LensSessionClient(object):
+    def __init__(self, base_url, username, password, database, conf):
+        self.base_url = base_url + "session/"
+        self.open(username, password, database, conf)
+
+    def __getitem__(self, key):
+        resp = requests.get(self.base_url + "params",
+                            params={'sessionid': self._sessionid, 'key': key},
+                            headers={'accept': 'application/json'})
+        if resp.ok:
+            params = resp.json(object_hook=WrappedJson)
+            text = params.elements[0]
+            if key in text:
+                text = text[len(key)+1:]
+            return text
+
+    def open(self, username, password, database, conf):
+        payload = [('username', username), ('password', password), 
('sessionconf', conf_to_xml(conf))]
+        if database:
+            payload.append(('database', database))
+        r = requests.post(self.base_url, files=payload, headers={'accept': 
'application/xml'})
+        r.raise_for_status()
+        self._sessionid = r.text
+
+    def close(self):
+        requests.delete(self.base_url, params={'sessionid': self._sessionid})
+

http://git-wip-us.apache.org/repos/asf/lens/blob/acb32d54/contrib/clients/python/test/test_lensclient.py
----------------------------------------------------------------------
diff --git a/contrib/clients/python/test/test_lensclient.py 
b/contrib/clients/python/test/test_lensclient.py
index d49c55b..e9b0bdb 100644
--- a/contrib/clients/python/test/test_lensclient.py
+++ b/contrib/clients/python/test/test_lensclient.py
@@ -16,26 +16,28 @@
 #
 from __future__ import print_function
 
+import glob
+import os
 import random
 import string
-from contextlib import contextmanager
-from requests.exceptions import HTTPError
-
+import subprocess
 import time
+from contextlib import contextmanager
 
 import pytest
-from lens.client import LensClient
 from lens.client.models import WrappedJson
-import subprocess
-import os
-import glob
+from requests.exceptions import HTTPError
+
+from lens.client import LensClient
+
 
 def check_output(command):
     output = subprocess.check_output(command.split())
-    if isinstance(output, bytes): # For Python 3. Python 2 directly gives 
string
+    if isinstance(output, bytes):  # For Python 3. Python 2 directly gives 
string
         output = output.decode("utf-8")
     return output
 
+
 @contextmanager
 def cwd(dir):
     cur_dir = os.getcwd()
@@ -43,26 +45,34 @@ def cwd(dir):
     yield
     os.chdir(cur_dir)
 
+
 def time_sorted_ls(path):
     mtime = lambda f: os.stat(os.path.join(path, f)).st_mtime
     return list(sorted(os.listdir(path), key=mtime))
 
+
 def has_error(msg):
     return any(x in msg for x in ('Error', 'error', 'Exception', 'exception'))
 
+
 def get_error():
     latest_out_file = list(name for name in time_sorted_ls('logs') if 
'lensserver.out' in name)[-1]
-    print (latest_out_file)
+    print(latest_out_file)
     with open(os.path.join('logs', latest_out_file)) as f:
         return f.read()
 
+
 def select_query(path):
     with open(path) as f:
         for line in f:
             if 'cube select' in line and 'sample_cube' in line:
                 return line
 
+
 class TestLensClient(object):
+    query = "cube select dim1, measure2 from sample_cube where 
time_range_in(dt, '2014-06-24-23', '2014-06-25-00')"
+    expected_result = [[21, 100], [22, 200], [23, 300], [24, 400], [25, 500], 
[26, 600], [27, 700], [28, 800]]
+
     @classmethod
     def setup_class(cls):
         cls.db = ''.join(random.choice(string.ascii_uppercase + string.digits) 
for _ in range(10))
@@ -109,6 +119,7 @@ class TestLensClient(object):
                 populate_output = check_output('bin/run-examples.sh 
populate-metastore -db ' + cls.db)
                 if has_error(populate_output):
                     raise Exception("Couldn't populate sample metastore: " + 
populate_output)
+
     @classmethod
     def teardown_class(cls):
         # TODO: drop database
@@ -123,18 +134,10 @@ class TestLensClient(object):
             with cwd('server'):
                 stop_output = check_output('bin/lens-ctl stop')
                 if has_error(stop_output):
-                    raise("Error stopping server: " + stop_output)
+                    raise ("Error stopping server: " + stop_output)
 
     def get_client(self):
-        return LensClient(database = self.db, 
conf=os.path.join(self.base_path, 'client', 'conf'))
-
-    def test_auto_close_session(self):
-        with self.get_client() as client:
-            pass
-        with pytest.raises(HTTPError) as e:
-            # Now any api should give 410
-            client.queries(state='RUNNING')
-        assert e.value.response.status_code == 410
+        return LensClient(database=self.db, conf=os.path.join(self.base_path, 
'client', 'conf'))
 
     def test_wrong_query(self):
         with self.get_client() as client:
@@ -146,16 +149,43 @@ class TestLensClient(object):
     def test_submit_query(self):
         with self.get_client() as client:
             handle = client.queries.submit(self.candidate_query)
-        # session not closed
-        assert client.queries[handle]
-        client.queries.wait_till_finish(handle)
-        client.close_session()
+        with pytest.raises(HTTPError) as e:
+            # Either of these can give 410
+            client.queries.wait_till_finish(handle)
+            client.queries.submit(self.candidate_query)
+        assert e.value.response.status_code == 410
 
     def test_list_query(self):
         with self.get_client() as client:
             handle = client.queries.submit(self.candidate_query, 
query_name="Candidate Query")
             finished_query = client.queries.wait_till_finish(handle)
             assert client.queries[handle] == finished_query
-            queries = client.queries(state='SUCCESSFUL', 
fromDate=finished_query.submission_time - 1, 
toDate=finished_query.submission_time + 1)
+            queries = client.queries(state='SUCCESSFUL', 
fromDate=finished_query.submission_time - 1,
+                                     toDate=finished_query.submission_time + 1)
             assert handle in queries
 
+    def test_non_persisted_result(self):
+        with self.get_client() as client:
+            result = client.queries.submit(self.query, fetch_result=True)
+            assert str(result)[:30] == 'file:/tmp/lensreports/hdfsout/'
+
+    def test_persisted_result(self):
+        with self.get_client() as client:
+            result = client.queries.submit(self.query, 
conf={'lens.query.enable.persistent.resultset': True},
+                                           delimiter=u'\x01', 
fetch_result=True)
+            assert list(iter(result)) == self.expected_result
+
+    def test_persistent_result_with_header(self):
+        with self.get_client() as client:
+            result = client.queries.submit(self.query,
+                                           
conf={'lens.query.enable.persistent.resultset': True,
+                                                 
'lens.query.output.write.header': True},
+                                           delimiter=u'\x01', 
fetch_result=True)
+            assert list(iter(result)) == self.expected_result
+
+    def test_inmemory_result(self):
+        with self.get_client() as client:
+            result = client.queries.submit(self.query,
+                                           
conf={'lens.query.enable.persistent.resultset.indriver': False},
+                                           fetch_result=True)
+            assert list(iter(result)) == self.expected_result
\ No newline at end of file

Reply via email to