Qifan Chen created IMPALA-11189: ----------------------------------- Summary: Concurrent insert ACL tests are broken in local catalog mode Key: IMPALA-11189 URL: https://issues.apache.org/jira/browse/IMPALA-11189 Project: IMPALA Issue Type: Bug Components: Catalog Reporter: Qifan Chen
Stress test test_concurrent_inserts (in tests/stress/test_acid_stress.py) fail repeatedly in local catalog mode. The concurrent checker query (select * from <table>) can return duplicated rows such as reported below, where row [0,2] is duplicated. This can be reproduced quite easily by running the test (i.e., TestConcurrentAcidInserts) first, via commenting out all the tests prior to it in the test file tests/stress/test_acid_stress.py. Setup: 1. Build the impala and clear HMS in case in a bad state: $IMPALA_HOME/buildall.sh -format_metastore -notests 2. Start the cluster in local catalog mode: $IMPALA_HOME/bin/start-impala-cluster.py --impalad_args --use_local_catalog=true --catalogd_args --catalog_topic_mode=minimal --catalogd_args --hms_event_polling_interval_s=1 3. Run the modified stress test: $IMPALA_HOME/bin/impala-py.test $IMPALA_TESTS/stress/test_acid_stress.py Error reported: {code:java} 09:11:00 qchen@qifan-10229: Impala.03112022] test_acid_stress rootLoggerLevel = INFO ================================================== test session starts =================================================== platform linux2 -- Python 2.7.16, pytest-2.9.2, py-1.4.32, pluggy-0.3.1 -- /home/qchen/Impala.03112022/infra/python/env-gcc7.5.0/bin/python cachedir: tests/.cache rootdir: /home/qchen/Impala.03112022/tests, inifile: pytest.ini plugins: xdist-1.17.1, timeout-1.2.1, random-0.2, forked-0.2 timeout: 7200s method: signal collected 2 items tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::test_concurrent_inserts[unique_database0] FAILED tests/stress/test_acid_stress.py::TestFailingAcidInserts::test_failing_inserts[unique_database0] PASSED ================================================ short test summary info ================================================= FAIL tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0] ======================================================== FAILURES ======================================================== __________________________ TestConcurrentAcidInserts.test_concurrent_inserts[unique_database0] ___________________________ tests/stress/test_acid_stress.py:307: in test_concurrent_inserts run_tasks(writers + checkers) tests/stress/stress_util.py:45: in run_tasks pool.map_async(Task.run, tasks).get(timeout_seconds) ../Impala.03082022/toolchain/toolchain-packages-gcc7.5.0/python-2.7.16/lib/python2.7/multiprocessing/pool.py:572: in get raise self._value E AssertionError: wid: 2 E assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4] E At index 3 diff: 2 != 3 E Left contains more items, first extra item: 4 E Full diff: E - [0, 1, 2, 2, 3, 4] E ? --- E + [0, 1, 2, 3, 4] ------------------------------------------------- Captured stderr setup -------------------------------------------------- SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; -- connecting to: localhost:21000 -- connecting to localhost:21050 with impyla -- 2022-03-16 09:20:54,762 INFO MainThread: Closing active operation -- connecting to localhost:28000 with impyla -- 2022-03-16 09:20:54,774 INFO MainThread: Closing active operation SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; SET sync_ddl=True; -- executing against localhost:21000 DROP DATABASE IF EXISTS `test_concurrent_inserts_8933345c` CASCADE; -- 2022-03-16 09:20:54,808 INFO MainThread: Started query 28457f4c7e77cdec:c6d3731900000000 SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; SET sync_ddl=True; -- executing against localhost:21000 CREATE DATABASE `test_concurrent_inserts_8933345c`; -- 2022-03-16 09:20:54,877 INFO MainThread: Started query 374bf99aea680523:48d2405400000000 -- 2022-03-16 09:21:01,164 INFO MainThread: Created database "test_concurrent_inserts_8933345c" for test ID "stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]" -------------------------------------------------- Captured stderr call -------------------------------------------------- SET SYNC_DDL=true; -- executing against localhost:21000 drop table if exists test_concurrent_inserts_8933345c.test_concurrent_inserts; -- 2022-03-16 09:21:01,173 INFO MainThread: Started query 20480c2a1d336d35:c2d84edd00000000 -- executing against localhost:21000 create table test_concurrent_inserts_8933345c.test_concurrent_inserts (wid int, i int) TBLPROPERTIES ( 'transactional_properties' = 'insert_only', 'transactional' = 'true') ; -- 2022-03-16 09:21:01,294 INFO MainThread: Started query 754969473483b4e9:acfc852300000000 SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; -- connecting to: localhost:21000 SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; -- connecting to: localhost:21001 SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; -- executing against localhost:21000 -- connecting to: localhost:21000 -- connecting to: localhost:21002 SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; -- connecting to: localhost:21001 SET client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]; -- connecting to: localhost:21002 -- connecting to: localhost:21000 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (0, 0); -- executing against localhost:21001 -- connecting to: localhost:21002 -- executing against localhost:21002 -- executing against localhost:21000 -- connecting to: localhost:21001 -- executing against localhost:21001 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (1, 0); -- executing against localhost:21002 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 0); -- executing against localhost:21000 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (3, 0); -- executing against localhost:21002 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (4, 0); -- executing against localhost:21001 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (5, 0); select * from test_concurrent_inserts_8933345c.test_concurrent_inserts; select * from test_concurrent_inserts_8933345c.test_concurrent_inserts; select * from test_concurrent_inserts_8933345c.test_concurrent_inserts; -- 2022-03-16 09:21:07,824 INFO Thread-3: Started query 6f4e9d38d1723252:a6a7368f00000000 -- 2022-03-16 09:21:07,859 INFO Thread-4: Started query 33423e0145b7d3e0:ea8d626200000000 -- 2022-03-16 09:21:07,861 INFO Thread-8: Started query 4046d8edde82931b:aa11c84800000000 -- 2022-03-16 09:21:07,875 INFO Thread-9: Started query 594d63b7814c31ab:8f2b92ca00000000 -- executing against localhost:21002 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 1); -- 2022-03-16 09:21:08,229 INFO Thread-4: Started query a94dab601c125bb4:8988934300000000 -- executing against localhost:21000 select * from test_concurrent_inserts_8933345c.test_concurrent_inserts; -- executing against localhost:21002 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 2); -- 2022-03-16 09:21:08,384 INFO Thread-3: Started query 4040597eba7beb36:4cfa08c800000000 -- 2022-03-16 09:21:08,409 INFO Thread-4: Started query 854e1aee63575861:dbf8bafb00000000 -- executing against localhost:21002 select * from test_concurrent_inserts_8933345c.test_concurrent_inserts; -- executing against localhost:21001 select * from test_concurrent_inserts_8933345c.test_concurrent_inserts; -- 2022-03-16 09:21:08,435 INFO Thread-9: Started query 7e4d4cf54a44cf03:868efcb100000000 -- 2022-03-16 09:21:08,456 INFO Thread-8: Started query 4645788cf2a4d401:0aea969e00000000 -- executing against localhost:21002 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 3); -- 2022-03-16 09:21:08,586 INFO Thread-4: Started query 4e4cdd976dfa3358:6c455b7300000000 -- executing against localhost:21002 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 4); -- 2022-03-16 09:21:08,711 INFO Thread-4: Started query 6f46942eb8807e5f:ffbb146000000000 -- executing against localhost:21002 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 5); -- executing against localhost:21000 select * from test_concurrent_inserts_8933345c.test_concurrent_inserts; -- 2022-03-16 09:21:08,959 INFO Thread-3: Started query d940fa405a776bef:3705218900000000 -- 2022-03-16 09:21:08,977 INFO Thread-4: Started query f947061a7c45901c:12b2ba9d00000000 -- executing against localhost:21001 select * from test_concurrent_inserts_8933345c.test_concurrent_inserts; -- 2022-03-16 09:21:08,997 INFO Thread-9: Started query f54a0aa01bbb2f86:3b89ae9600000000 -- executing against localhost:21002 select * from test_concurrent_inserts_8933345c.test_concurrent_inserts; -- 2022-03-16 09:21:09,147 INFO Thread-8: Started query fa418523bc0552ce:f0e49b1a00000000 -- executing against localhost:21002 insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts values (2, 6); -- 2022-03-16 09:21:09,250 INFO Thread-4: Started query fc4751d49d1a1aa2:4bcb48d600000000 -- closing connection to: localhost:21002 Traceback (most recent call last): File "/home/qchen/Impala.03112022/tests/stress/stress_util.py", line 35, in run return self.func(*self.args, **self.kwargs) File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line 276, in _impala_role_concurrent_checker verify_result_set(result) File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line 269, in verify_result_set assert sorted_run == range(sorted_run[0], sorted_run[-1] + 1), "wid: %d" % wid AssertionError: wid: 2 assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4] At index 3 diff: 2 != 3 Left contains more items, first extra item: 4 Full diff: - [0, 1, 2, 2, 3, 4] ? --- + [0, 1, 2, 3, 4] ========================================== 1 failed, 1 passed in 158.88 seconds ========================================== [09:23:33 qchen@qifan-10229: Impala.03112022] git branch IMPALA-10992-auto-scaling-planner-support * master [09:23:41 qchen@qifan-10229: Impala.03112022] git diff diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index a921bb961..6df592f3f 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -1644,6 +1644,7 @@ public class Frontend { markTimelineRetries(attempt, retryMsg, timeline); return req; } catch (InconsistentMetadataFetchException e) { + LOG.error("BAR: catch an InconsistentMetadataFetchException"); if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) { markTimelineRetries(attempt, e.getMessage(), timeline); throw e; @@ -1819,14 +1820,18 @@ public class Frontend { } catch (Exception e) { if (queryCtx.isSetTransaction_id()) { try { + LOG.error("BAR: to abort Transaction"); abortTransaction(queryCtx.getTransaction_id()); + LOG.error("BAR: Transaction aborted"); timeline.markEvent("Transaction aborted"); } catch (TransactionException te) { LOG.error("Could not abort transaction because: " + te.getMessage()); } } else if (queryCtx.isIs_kudu_transactional()) { try { + LOG.error("BAR: to abort kudu Transaction"); abortKuduTransaction(queryCtx.getQuery_id()); + LOG.error("BAR: kudu Transaction aborted"); timeline.markEvent( "Kudu transaction aborted: " + queryCtx.getQuery_id().toString()); } catch (TransactionException te) { diff --git a/tests/stress/test_acid_stress.py b/tests/stress/test_acid_stress.py index f6439ff3c..09ec874e7 100644 --- a/tests/stress/test_acid_stress.py +++ b/tests/stress/test_acid_stress.py @@ -46,188 +46,188 @@ class TestAcidStress(ImpalaTestSuite): v.get_value('table_format').compression_codec == 'none')) -class TestAcidInsertsBasic(TestAcidStress): - @classmethod - def get_workload(self): - return super(TestAcidInsertsBasic, self).get_workload() - - @classmethod - def add_test_dimensions(cls): - super(TestAcidInsertsBasic, cls).add_test_dimensions() - - def _verify_result(self, result, expected_result): - """Verify invariants for 'run' and 'i'.""" - assert len(result.data) > 0 - run_max = -1 - i_list = [] - for line in result.data: - [run, i] = map(int, (line.split('\t'))) - run_max = max(run_max, run) - i_list.append(i) - assert expected_result["run"] <= run_max # shouldn't see data overwritten in the past - i_list.sort() - if expected_result["run"] < run_max: - expected_result["run"] = run_max - expected_result["i"] = 0 - return - assert i_list[-1] >= expected_result["i"] - assert i_list == range(i_list[-1] + 1) # 'i' should have all values from 0 to max_i - expected_result["i"] = i_list[-1] - - def _hive_role_write_inserts(self, tbl_name, partitioned): - """INSERT INTO/OVERWRITE a table several times from Hive.""" - part_expr = "partition (p=1)" if partitioned else "" - for run in xrange(0, NUM_OVERWRITES): - OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i) - """ % (tbl_name, part_expr, run, 0) - self.run_stmt_in_hive(OVERWRITE_SQL) - for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1): - INSERT_SQL = """insert into table %s %s values (%i, %i) - """ % (tbl_name, part_expr, run, i) - self.run_stmt_in_hive(INSERT_SQL) - - def _impala_role_write_inserts(self, tbl_name, partitioned): - """INSERT INTO/OVERWRITE a table several times from Impala.""" - try: - impalad_client = ImpalaTestSuite.create_impala_client() - part_expr = "partition (p=1)" if partitioned else "" - for run in xrange(0, NUM_OVERWRITES + 1): - OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i) - """ % (tbl_name, part_expr, run, 0) - impalad_client.execute(OVERWRITE_SQL) - for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1): - INSERT_SQL = """insert into table %s %s values (%i, %i) - """ % (tbl_name, part_expr, run, i) - impalad_client.execute(INSERT_SQL) - finally: - impalad_client.close() - - def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds): - """SELECT from a table many times until the expected final values are found.""" - try: - impalad_client = ImpalaTestSuite.create_impala_client() - expected_result = {"run": -1, "i": 0} - accept_empty_table = True - while expected_result["run"] != NUM_OVERWRITES and \ - expected_result["i"] != NUM_INSERTS_PER_OVERWRITE: - time.sleep(sleep_seconds) - if needs_refresh: impalad_client.execute("refresh %s" % tbl_name) - result = impalad_client.execute("select run, i from %s" % tbl_name) - if len(result.data) == 0: - assert accept_empty_table - continue - accept_empty_table = False - self._verify_result(result, expected_result) - finally: - impalad_client.close() - - def _create_table(self, full_tbl_name, partitioned): - """Creates test table with name 'full_tbl_name'. Table is partitioned if - 'partitioned' is set to True.""" - part_expr = "partitioned by (p int)" if partitioned else "" - - CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES ( - 'transactional_properties' = 'insert_only', 'transactional' = 'true') - """ % (full_tbl_name, part_expr) - self.client.execute("drop table if exists %s" % full_tbl_name) - self.client.execute(CREATE_SQL) - - def _run_test_read_hive_inserts(self, unique_database, partitioned): - """Check that Impala can read a single insert only ACID table (over)written by Hive - several times. Consistency can be checked by using incremental values for - overwrites ('run') and inserts ('i'). - """ - tbl_name = "%s.test_read_hive_inserts" % unique_database - self._create_table(tbl_name, partitioned) - - run_tasks([ - Task(self._hive_role_write_inserts, tbl_name, partitioned), - Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True, - sleep_seconds=3)]) - - def _run_test_read_impala_inserts(self, unique_database, partitioned): - """Check that Impala can read a single insert only ACID table (over)written by Hive - several times. Consistency can be checked by using incremental values for - overwrites ('run') and inserts ('i'). - """ - tbl_name = "%s.test_read_impala_inserts" % unique_database - self._create_table(tbl_name, partitioned) - - run_tasks([ - Task(self._impala_role_write_inserts, tbl_name, partitioned), - Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False, - sleep_seconds=0.1)]) - - @SkipIfHive2.acid - @SkipIfS3.hive - @SkipIfGCS.hive - @SkipIfCOS.hive - @pytest.mark.execute_serially - @pytest.mark.stress - def test_read_hive_inserts(self, unique_database): - """Check that Impala can read partitioned and non-partitioned ACID tables - written by Hive.""" - for is_partitioned in [False, True]: - self._run_test_read_hive_inserts(unique_database, is_partitioned) - - @SkipIfHive2.acid - @pytest.mark.execute_serially - @pytest.mark.stress - def test_read_impala_inserts(self, unique_database): - """Check that Impala can read partitioned and non-partitioned ACID tables - written by Hive.""" - for is_partitioned in [False, True]: - self._run_test_read_impala_inserts(unique_database, is_partitioned) - - def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite, sleep_sec): - insert_op = "OVERWRITE" if is_overwrite else "INTO" - try: - impalad_client = ImpalaTestSuite.create_impala_client() - impalad_client.execute( - """insert {op} table {tbl_name} partition({partition}) - select sleep({sleep_ms})""".format(op=insert_op, tbl_name=tbl_name, - partition=partition, sleep_ms=sleep_sec * 1000)) - finally: - impalad_client.close() - - @pytest.mark.execute_serially - @pytest.mark.stress - @SkipIf.not_hdfs - @UniqueDatabase.parametrize(sync_ddl=True) - def test_partitioned_inserts(self, unique_database): - """Check that the different ACID write operations take appropriate locks. - INSERT INTO: should take a shared lock - INSERT OVERWRITE: should take an exclusive lock - Both should take PARTITION-level lock in case of static partition insert.""" - tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database - self.client.set_configuration_option("SYNC_DDL", "true") - self.client.execute(""" - CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT) - TBLPROPERTIES( - 'transactional_properties'='insert_only','transactional'='true')""".format( - tbl_name)) - # Warmup INSERT - self.execute_query("alter table {0} add partition(p=0,q=0)".format(tbl_name)) - sleep_sec = 5 - task_insert_into = Task(self._impala_role_partition_writer, tbl_name, - "p=1,q=1", False, sleep_sec) - # INSERT INTO the same partition can run in parallel. - duration = run_tasks([task_insert_into, task_insert_into]) - assert duration < 3 * sleep_sec - task_insert_overwrite = Task(self._impala_role_partition_writer, tbl_name, - "p=1,q=1", True, sleep_sec) - # INSERT INTO + INSERT OVERWRITE should have mutual exclusion. - duration = run_tasks([task_insert_into, task_insert_overwrite]) - assert duration > 4 * sleep_sec - # INSERT OVERWRITEs to the same partition should have mutual exclusion. - duration = run_tasks([task_insert_overwrite, task_insert_overwrite]) - assert duration > 4 * sleep_sec - task_insert_overwrite_2 = Task(self._impala_role_partition_writer, tbl_name, - "p=1,q=2", True, sleep_sec) - # INSERT OVERWRITEs to different partitions can run in parallel. - duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2]) - assert duration < 3 * sleep_sec - +#class TestAcidInsertsBasic(TestAcidStress): +# @classmethod +# def get_workload(self): +# return super(TestAcidInsertsBasic, self).get_workload() +# +# @classmethod +# def add_test_dimensions(cls): +# super(TestAcidInsertsBasic, cls).add_test_dimensions() +# +# def _verify_result(self, result, expected_result): +# """Verify invariants for 'run' and 'i'.""" +# assert len(result.data) > 0 +# run_max = -1 +# i_list = [] +# for line in result.data: +# [run, i] = map(int, (line.split('\t'))) +# run_max = max(run_max, run) +# i_list.append(i) +# assert expected_result["run"] <= run_max # shouldn't see data overwritten in the past +# i_list.sort() +# if expected_result["run"] < run_max: +# expected_result["run"] = run_max +# expected_result["i"] = 0 +# return +# assert i_list[-1] >= expected_result["i"] +# assert i_list == range(i_list[-1] + 1) # 'i' should have all values from 0 to max_i +# expected_result["i"] = i_list[-1] +# +# def _hive_role_write_inserts(self, tbl_name, partitioned): +# """INSERT INTO/OVERWRITE a table several times from Hive.""" +# part_expr = "partition (p=1)" if partitioned else "" +# for run in xrange(0, NUM_OVERWRITES): +# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i) +# """ % (tbl_name, part_expr, run, 0) +# self.run_stmt_in_hive(OVERWRITE_SQL) +# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1): +# INSERT_SQL = """insert into table %s %s values (%i, %i) +# """ % (tbl_name, part_expr, run, i) +# self.run_stmt_in_hive(INSERT_SQL) +# +# def _impala_role_write_inserts(self, tbl_name, partitioned): +# """INSERT INTO/OVERWRITE a table several times from Impala.""" +# try: +# impalad_client = ImpalaTestSuite.create_impala_client() +# part_expr = "partition (p=1)" if partitioned else "" +# for run in xrange(0, NUM_OVERWRITES + 1): +# OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i) +# """ % (tbl_name, part_expr, run, 0) +# impalad_client.execute(OVERWRITE_SQL) +# for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1): +# INSERT_SQL = """insert into table %s %s values (%i, %i) +# """ % (tbl_name, part_expr, run, i) +# impalad_client.execute(INSERT_SQL) +# finally: +# impalad_client.close() +# +# def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds): +# """SELECT from a table many times until the expected final values are found.""" +# try: +# impalad_client = ImpalaTestSuite.create_impala_client() +# expected_result = {"run": -1, "i": 0} +# accept_empty_table = True +# while expected_result["run"] != NUM_OVERWRITES and \ +# expected_result["i"] != NUM_INSERTS_PER_OVERWRITE: +# time.sleep(sleep_seconds) +# if needs_refresh: impalad_client.execute("refresh %s" % tbl_name) +# result = impalad_client.execute("select run, i from %s" % tbl_name) +# if len(result.data) == 0: +# assert accept_empty_table +# continue +# accept_empty_table = False +# self._verify_result(result, expected_result) +# finally: +# impalad_client.close() +# +# def _create_table(self, full_tbl_name, partitioned): +# """Creates test table with name 'full_tbl_name'. Table is partitioned if +# 'partitioned' is set to True.""" +# part_expr = "partitioned by (p int)" if partitioned else "" +# +# CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES ( +# 'transactional_properties' = 'insert_only', 'transactional' = 'true') +# """ % (full_tbl_name, part_expr) +# self.client.execute("drop table if exists %s" % full_tbl_name) +# self.client.execute(CREATE_SQL) +# +# def _run_test_read_hive_inserts(self, unique_database, partitioned): +# """Check that Impala can read a single insert only ACID table (over)written by Hive +# several times. Consistency can be checked by using incremental values for +# overwrites ('run') and inserts ('i'). +# """ +# tbl_name = "%s.test_read_hive_inserts" % unique_database +# self._create_table(tbl_name, partitioned) +# +# run_tasks([ +# Task(self._hive_role_write_inserts, tbl_name, partitioned), +# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True, +# sleep_seconds=3)]) +# +# def _run_test_read_impala_inserts(self, unique_database, partitioned): +# """Check that Impala can read a single insert only ACID table (over)written by Hive +# several times. Consistency can be checked by using incremental values for +# overwrites ('run') and inserts ('i'). +# """ +# tbl_name = "%s.test_read_impala_inserts" % unique_database +# self._create_table(tbl_name, partitioned) +# +# run_tasks([ +# Task(self._impala_role_write_inserts, tbl_name, partitioned), +# Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False, +# sleep_seconds=0.1)]) +# +# @SkipIfHive2.acid +# @SkipIfS3.hive +# @SkipIfGCS.hive +# @SkipIfCOS.hive +# @pytest.mark.execute_serially +# @pytest.mark.stress +# def test_read_hive_inserts(self, unique_database): +# """Check that Impala can read partitioned and non-partitioned ACID tables +# written by Hive.""" +# for is_partitioned in [False, True]: +# self._run_test_read_hive_inserts(unique_database, is_partitioned) +# +# @SkipIfHive2.acid +# @pytest.mark.execute_serially +# @pytest.mark.stress +# def test_read_impala_inserts(self, unique_database): +# """Check that Impala can read partitioned and non-partitioned ACID tables +# written by Hive.""" +# for is_partitioned in [False, True]: +# self._run_test_read_impala_inserts(unique_database, is_partitioned) +# +# def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite, sleep_sec): +# insert_op = "OVERWRITE" if is_overwrite else "INTO" +# try: +# impalad_client = ImpalaTestSuite.create_impala_client() +# impalad_client.execute( +# """insert {op} table {tbl_name} partition({partition}) +# select sleep({sleep_ms})""".format(op=insert_op, tbl_name=tbl_name, +# partition=partition, sleep_ms=sleep_sec * 1000)) +# finally: +# impalad_client.close() +# +# @pytest.mark.execute_serially +# @pytest.mark.stress +# @SkipIf.not_hdfs +# @UniqueDatabase.parametrize(sync_ddl=True) +# def test_partitioned_inserts(self, unique_database): +# """Check that the different ACID write operations take appropriate locks. +# INSERT INTO: should take a shared lock +# INSERT OVERWRITE: should take an exclusive lock +# Both should take PARTITION-level lock in case of static partition insert.""" +# tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database +# self.client.set_configuration_option("SYNC_DDL", "true") +# self.client.execute(""" +# CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT) +# TBLPROPERTIES( +# 'transactional_properties'='insert_only','transactional'='true')""".format( +# tbl_name)) +# # Warmup INSERT +# self.execute_query("alter table {0} add partition(p=0,q=0)".format(tbl_name)) +# sleep_sec = 5 +# task_insert_into = Task(self._impala_role_partition_writer, tbl_name, +# "p=1,q=1", False, sleep_sec) +# # INSERT INTO the same partition can run in parallel. +# duration = run_tasks([task_insert_into, task_insert_into]) +# assert duration < 3 * sleep_sec +# task_insert_overwrite = Task(self._impala_role_partition_writer, tbl_name, +# "p=1,q=1", True, sleep_sec) +# # INSERT INTO + INSERT OVERWRITE should have mutual exclusion. +# duration = run_tasks([task_insert_into, task_insert_overwrite]) +# assert duration > 4 * sleep_sec +# # INSERT OVERWRITEs to the same partition should have mutual exclusion. +# duration = run_tasks([task_insert_overwrite, task_insert_overwrite]) +# assert duration > 4 * sleep_sec +# task_insert_overwrite_2 = Task(self._impala_role_partition_writer, tbl_name, +# "p=1,q=2", True, sleep_sec) +# # INSERT OVERWRITEs to different partitions can run in parallel. +# duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2]) +# assert duration < 3 * sleep_sec +# class TestConcurrentAcidInserts(TestAcidStress): @classmethod (END) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org For additional commands, e-mail: issues-all-h...@impala.apache.org