Changeset: 8888dce31e77 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8888dce31e77 Added Files: clients/iotapi/tests/__init__.py clients/iotapi/tests/frontendtests.py clients/iotapi/tests/main.py Modified Files: clients/iotapi/requirements.txt clients/iotclient/src/Streams/jsonschemas.py clients/iotclient/src/Streams/streams.py clients/iotclient/tests/main.py Branch: iot Log Message:
Added first Front-End test diffs (truncated from 364 to 300 lines): diff --git a/clients/iotapi/requirements.txt b/clients/iotapi/requirements.txt --- a/clients/iotapi/requirements.txt +++ b/clients/iotapi/requirements.txt @@ -4,7 +4,9 @@ jsonschema==2.5.1 python-dateutil==2.5.3 python-monetdb==11.24.0 pytz==2016.4 +requests==2.10.0 Sphinx==1.4.4 sphinx-rtd-theme==0.1.9 tzlocal==1.2.2 watchdog==0.8.3 +websocket-client==0.37.0 diff --git a/clients/iotapi/tests/__init__.py b/clients/iotapi/tests/__init__.py new file mode 100644 diff --git a/clients/iotapi/tests/frontendtests.py b/clients/iotapi/tests/frontendtests.py new file mode 100644 --- /dev/null +++ b/clients/iotapi/tests/frontendtests.py @@ -0,0 +1,148 @@ +import json +import os +import requests +import websocket + +from distutils.dir_util import copy_tree +from threading import Thread +from time import sleep +from unittest import TestCase + +__all__ = ['NullablesTest'] + +WEB_SOCKETS_THREADS_TIMEOUT = 15 + + +class BaseFrontEndTest(TestCase): + + def __init__(self, **kwargs): + super(BaseFrontEndTest, self).__init__() + self._web_server_baskets_location = os.path.join(kwargs['iot_client_path'], 'baskets') + self._web_api_baskets_location = os.path.join(kwargs['iot_api_path'], 'baskets') + self.schema = "tests" + + def export_inserts(self, schema, stream, basket): + input_dir = os.path.join(self._web_server_baskets_location, schema, stream, basket) + output_dir = os.path.join(self._web_api_baskets_location, schema, stream, basket) + copy_tree(input_dir, output_dir) + + +class TestWebSocket(websocket.WebSocketApp): + + def __init__(self, test, url, header=[], on_open=None, on_message=None, on_error=None, on_close=None, on_ping=None, + on_pong=None, on_cont_message=None, keep_running=True, get_mask_key=None, cookie=None, + subprotocols=None, on_data=None): + super(TestWebSocket, self).__init__(url, header, on_open, on_message, on_error, on_close, on_ping, + on_pong, on_cont_message, keep_running, get_mask_key, cookie, + subprotocols, on_data) + self.test = test + self.ws_state = 1 + + +def on_open(ws): + ws.send(json.dumps({"request": "sub", "schema": ws.test.schema, "stream": ws.test.stream})) + + +def on_message(ws, message): + resp = json.loads(message) + if resp['response'] == 'error': + ws.test.fail(msg=resp['message']) + elif ws.ws_state == 1: + ws.test.assertDictEqual({"response": "subscribed", "schema": ws.test.schema, "stream": ws.test.stream}, resp) + ws.ws_state = 2 + elif ws.ws_state == 2: + ws.test.assertDictEqual({"response": "notification", "schema": ws.test.schema, "stream": ws.test.stream, + "basket": 1, "count": 3}, resp) + ws.send(json.dumps({"request": "read", "schema": ws.test.schema, "stream": ws.test.stream, + "basket": 1, "offset": 0, "limit": 3})) + ws.ws_state = 3 + elif ws.ws_state == 3: + ws.test.assertIn('implicit_timestamp', resp['tuples'][0], msg='Timestamp not in stream') + del resp['tuples'][0]['implicit_timestamp'] + del resp['tuples'][1]['implicit_timestamp'] + del resp['tuples'][2]['implicit_timestamp'] + res_dic = {'vala': None, "valb": None, "valc": None, "vald": None, "vale": None, "valf": None, "valg": None, + "valh": None, "vali": None, "valj": None, "valk": None, "vall": None, "valm": None, "valn": None, + "valo": None, "valp": None, "valq": None, "valr": None, "vals": None, "valt": None, "valu": None, + "valv": None, "valw": None, "valx": None, "valy": None, "valz": None} + tuples_response = {"response": "read", "schema": ws.test.schema, "stream": ws.test.stream, + "count": 3, "tuples": [res_dic, res_dic, res_dic]} + ws.test.assertDictEqual(tuples_response, resp) + ws.close() + elif ws.ws_state == 4: + ws.test.assertDictEqual({"response": "removed", "schema": ws.test.schema, "stream": ws.test.stream}, resp) + ws.close() + + +def on_error(ws, error): + ws.close() + ws.test.fail(msg=error) + + +def web_socket(test): + ws = TestWebSocket(test=test, url="ws://127.0.0.1:8002/", on_message=on_message, on_open=on_open, on_error=on_error) + test.ws = ws + ws.run_forever() + + +class NullablesTest(BaseFrontEndTest): + + def __init__(self, **kwargs): + super(NullablesTest, self).__init__(**kwargs) + self.stream = "nulls" + self.ws = None + + def runTest(self): + json_str = {"schema": self.schema, "stream": self.stream, "has_hostname": False, "flushing": {"base": "auto"}, + "columns": [{"name": "vala", "type": "string", "nullable": True}, + {"name": "valb", "type": "uuid", "nullable": True}, + {"name": "valc", "type": "mac", "nullable": True}, + {"name": "vald", "type": "url", "nullable": True}, + {"name": "vale", "type": "inet", "nullable": True}, + {"name": "valf", "type": "inet6", "nullable": True}, + {"name": "valg", "type": "regex", "regex": "a", "nullable": True}, + {"name": "valh", "type": "varchar", "limit": 16, "nullable": True}, + {"name": "vali", "type": "enum", "values": ["a", "b", "c"], "nullable": True}, + {"name": "valj", "type": "boolean", "nullable": True}, + {"name": "valk", "type": "tinyint", "nullable": True}, + {"name": "vall", "type": "smallint", "nullable": True}, + {"name": "valm", "type": "int", "nullable": True}, + {"name": "valn", "type": "bigint", "nullable": True}, + {"name": "valo", "type": "hugeint", "nullable": True}, + {"name": "valp", "type": "real", "nullable": True}, + {"name": "valq", "type": "float", "nullable": True}, + {"name": "valr", "type": "decimal", "precision": 12, "scale": 10, "nullable": True}, + {"name": "vals", "type": "decimal", "precision": 28, "scale": 20, "nullable": True}, + {"name": "valt", "type": "date", "nullable": True}, + {"name": "valu", "type": "time", "nullable": True}, + {"name": "valv", "type": "time with time zone", "nullable": True}, + {"name": "valw", "type": "timestamp", "nullable": True}, + {"name": "valx", "type": "timestamp with time zone", "nullable": True}, + {"name": "valy", "type": "interval month", "nullable": True}, + {"name": "valz", "type": "interval second", "nullable": True}]} + + resp = requests.post("http://127.0.0.1:8001/context", json=json_str) + + self.assertEqual(resp.status_code, 201, msg=resp.text) + + sleep(2) # we need to sleep to check that the next poll happens + + thread = Thread(target=web_socket, args=(self, )) + thread.start() + + resp = requests.post("http://127.0.0.1:8000/stream/%s/%s" % (self.schema, self.stream), json=[{}, {}, {}]) + + self.assertEqual(resp.status_code, 201, msg=resp.text) + + self.export_inserts("tests", "nulls", "1") + + sleep(2) + + resp = requests.delete("http://127.0.0.1:8001/context", json={"schema": self.schema, "stream": self.stream}) + + self.assertEqual(resp.status_code, 204, msg=resp.text) + + thread.join(timeout=WEB_SOCKETS_THREADS_TIMEOUT) + if thread.isAlive(): + self.ws.close() + self.fail(msg='The websockets tests timed out!') diff --git a/clients/iotapi/tests/main.py b/clients/iotapi/tests/main.py new file mode 100644 --- /dev/null +++ b/clients/iotapi/tests/main.py @@ -0,0 +1,95 @@ +import argparse +import getpass +import os +import shutil +import subprocess +import signal +import sys +import time + +from unittest import TextTestRunner, TestSuite +from frontendtests import NullablesTest + + +def check_positive_int(value): + ivalue = int(value) + if ivalue <= 0: + raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value) + return ivalue + + +def check_path(value): + if not os.path.isabs(value): + raise argparse.ArgumentTypeError("%s is an invalid path" % value) + return value + + +def main(): + parser = argparse.ArgumentParser(description='IOT Front-End Test', add_help=False) + parser.add_argument('-n', '--number', type=check_positive_int, nargs='?', default=1000, + help='Number of inserts (default: 1000)', metavar='NUMBER') + parser.add_argument('-f', '--filepath', type=check_path, nargs='?', default='/tmp', + help='Temp file location (default: %s)' % '/tmp', metavar='FILE_PATH') + parser.add_argument('-h', '--host', nargs='?', default='127.0.0.1', + help='MonetDB database host (default: 127.0.0.1)', metavar='HOST') + parser.add_argument('-p', '--port', type=check_positive_int, nargs='?', default=50000, + help='Database listening port (default: 50000)', metavar='PORT') + parser.add_argument('-d', '--database', nargs='?', default='iotdb', help='Database name (default: iotdb)') + parser.add_argument('-u', '--user', nargs='?', default='monetdb', help='Database user (default: monetdb)') + parser.add_argument('-?', '--help', action='store_true', help='Display this help') + + try: + args = vars(parser.parse_args()) + except BaseException as ex: + print ex + sys.exit(1) + + if args['help']: + parser.print_help() + sys.exit(0) + + test_dir = os.path.join(args['filepath'], 'test_dir') + shutil.rmtree(test_dir, ignore_errors=True) + iot_client_log = os.path.join(test_dir, 'iotclient.log') + iot_api_log = os.path.join(test_dir, 'iotapi.log') + + iot_client_path = os.path.join(test_dir, 'iotclient') + if not os.path.exists(iot_client_path): + os.makedirs(iot_client_path) + iot_api_path = os.path.join(test_dir, 'iotapi') + if not os.path.exists(iot_api_path): + os.makedirs(iot_api_path) + + con_pass = getpass.getpass(prompt='Insert password for user ' + args['user'] + ':') + other_arguments = ["-h", args['host'], "-p", str(args['port']), "-d", args['database'], "-po", "1"] + + head, _ = os.path.split(os.path.dirname(os.path.abspath(__file__))) # get the iotapi path + iot_client_exec_dir = os.path.join(os.path.split(head)[0], "iotclient", "src", "main.py") + iot_api_exec_dir = os.path.join(head, "src", "main.py") + + iot_client = subprocess.Popen([sys.executable, iot_client_exec_dir, "-f", iot_client_path, + "-l", iot_client_log] + other_arguments, stdin=subprocess.PIPE) + iot_api = subprocess.Popen([sys.executable, iot_api_exec_dir, "-f", iot_api_path, + "-l", iot_api_log] + other_arguments, stdin=subprocess.PIPE) + iot_client.stdin.write(con_pass + os.linesep) + iot_client.stdin.flush() + iot_api.stdin.write(con_pass + os.linesep) + iot_api.stdin.flush() + + time.sleep(5) + + if iot_client.returncode is None and iot_api.returncode is None: + TextTestRunner(verbosity=2).run(TestSuite(tests=[NullablesTest(iot_client_path=iot_client_path, + iot_api_path=iot_api_path)])) + else: + print 'Processes finished', iot_client.returncode, iot_api.returncode + shutil.rmtree(test_dir, ignore_errors=True) + sys.exit(1) + + iot_client.send_signal(signal.SIGINT) + iot_api.send_signal(signal.SIGINT) + time.sleep(1) + shutil.rmtree(test_dir, ignore_errors=True) + +if __name__ == '__main__': + main() diff --git a/clients/iotclient/src/Streams/jsonschemas.py b/clients/iotclient/src/Streams/jsonschemas.py --- a/clients/iotclient/src/Streams/jsonschemas.py +++ b/clients/iotclient/src/Streams/jsonschemas.py @@ -93,7 +93,7 @@ def init_create_streams_schema(add_hugei "base": {"type": "string", "enum": [TUPLE_FLUSH_IDENTIFIER]}, "interval": {"type": "integer", "minimum": 1} }, - "required": ["base", "number"], + "required": ["base", "interval"], "additionalProperties": False }, { "properties": { diff --git a/clients/iotclient/src/Streams/streams.py b/clients/iotclient/src/Streams/streams.py --- a/clients/iotclient/src/Streams/streams.py +++ b/clients/iotclient/src/Streams/streams.py @@ -64,7 +64,7 @@ class BaseIOTStream: if dirs: for elem in dirs: # for each directory found, flush it dir_path = os.path.join(self._base_path, str(elem)) - mapi_flush_baskets(self._connection, self._schema_name, self._stream_name, dir_path) + # mapi_flush_baskets(self._connection, self._schema_name, self._stream_name, dir_path) self._baskets_counter = max(dirs) + 1 # increment current basket number else: self._baskets_counter = 1 @@ -124,8 +124,8 @@ class BaseIOTStream: add_log(20, 'Stopped stream %s.%s' % (self._schema_name, self._stream_name)) @abstractmethod - def get_flushing_dictionary(self): # for information about the stream - return {} + def get_flushing_dictionary(self, number_tuples): # for information about the stream + return () _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list