Qifan Chen created IMPALA-11189:
-----------------------------------

             Summary: Concurrent insert ACL tests are broken in local catalog 
mode
                 Key: IMPALA-11189
                 URL: https://issues.apache.org/jira/browse/IMPALA-11189
             Project: IMPALA
          Issue Type: Bug
          Components: Catalog
            Reporter: Qifan Chen


Stress test test_concurrent_inserts (in tests/stress/test_acid_stress.py) fail 
repeatedly in local catalog mode. The concurrent checker query (select * from 
<table>) can return duplicated rows such as reported below, where row [0,2] is 
duplicated. 

This can be reproduced quite easily by running the test (i.e., 
TestConcurrentAcidInserts) first, via commenting out all the tests prior to it 
in the test file tests/stress/test_acid_stress.py. 

Setup:

1. Build the impala and clear HMS in case in a bad state: 
$IMPALA_HOME/buildall.sh -format_metastore -notests
2. Start the cluster in local catalog mode: 
$IMPALA_HOME/bin/start-impala-cluster.py --impalad_args 
--use_local_catalog=true --catalogd_args  --catalog_topic_mode=minimal 
--catalogd_args --hms_event_polling_interval_s=1
3. Run the modified stress test: $IMPALA_HOME/bin/impala-py.test 
$IMPALA_TESTS/stress/test_acid_stress.py

Error reported:


{code:java}
09:11:00 qchen@qifan-10229: Impala.03112022] test_acid_stress
rootLoggerLevel = INFO
================================================== test session starts 
===================================================
platform linux2 -- Python 2.7.16, pytest-2.9.2, py-1.4.32, pluggy-0.3.1 -- 
/home/qchen/Impala.03112022/infra/python/env-gcc7.5.0/bin/python
cachedir: tests/.cache
rootdir: /home/qchen/Impala.03112022/tests, inifile: pytest.ini
plugins: xdist-1.17.1, timeout-1.2.1, random-0.2, forked-0.2
timeout: 7200s method: signal
collected 2 items 

tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::test_concurrent_inserts[unique_database0]
 FAILED
tests/stress/test_acid_stress.py::TestFailingAcidInserts::test_failing_inserts[unique_database0]
 PASSED
================================================ short test summary info 
=================================================
FAIL 
tests/stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]

======================================================== FAILURES 
========================================================
__________________________ 
TestConcurrentAcidInserts.test_concurrent_inserts[unique_database0] 
___________________________
tests/stress/test_acid_stress.py:307: in test_concurrent_inserts
    run_tasks(writers + checkers)
tests/stress/stress_util.py:45: in run_tasks
    pool.map_async(Task.run, tasks).get(timeout_seconds)
../Impala.03082022/toolchain/toolchain-packages-gcc7.5.0/python-2.7.16/lib/python2.7/multiprocessing/pool.py:572:
 in get
    raise self._value
E   AssertionError: wid: 2
E   assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
E     At index 3 diff: 2 != 3
E     Left contains more items, first extra item: 4
E     Full diff:
E     - [0, 1, 2, 2, 3, 4]
E     ?           ---
E     + [0, 1, 2, 3, 4]
------------------------------------------------- Captured stderr setup 
--------------------------------------------------
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21000
-- connecting to localhost:21050 with impyla
-- 2022-03-16 09:20:54,762 INFO     MainThread: Closing active operation
-- connecting to localhost:28000 with impyla
-- 2022-03-16 09:20:54,774 INFO     MainThread: Closing active operation
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET sync_ddl=True;
-- executing against localhost:21000

DROP DATABASE IF EXISTS `test_concurrent_inserts_8933345c` CASCADE;

-- 2022-03-16 09:20:54,808 INFO     MainThread: Started query 
28457f4c7e77cdec:c6d3731900000000
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET sync_ddl=True;
-- executing against localhost:21000

CREATE DATABASE `test_concurrent_inserts_8933345c`;

-- 2022-03-16 09:20:54,877 INFO     MainThread: Started query 
374bf99aea680523:48d2405400000000
-- 2022-03-16 09:21:01,164 INFO     MainThread: Created database 
"test_concurrent_inserts_8933345c" for test ID 
"stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0]"
-------------------------------------------------- Captured stderr call 
--------------------------------------------------
SET SYNC_DDL=true;
-- executing against localhost:21000

drop table if exists test_concurrent_inserts_8933345c.test_concurrent_inserts;

-- 2022-03-16 09:21:01,173 INFO     MainThread: Started query 
20480c2a1d336d35:c2d84edd00000000
-- executing against localhost:21000

create table test_concurrent_inserts_8933345c.test_concurrent_inserts (wid int, 
i int) TBLPROPERTIES (
        'transactional_properties' = 'insert_only', 'transactional' = 'true')
        ;

-- 2022-03-16 09:21:01,294 INFO     MainThread: Started query 
754969473483b4e9:acfc852300000000
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21000
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21001
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- executing against localhost:21000

-- connecting to: localhost:21000
-- connecting to: localhost:21002
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21001
SET 
client_identifier=stress/test_acid_stress.py::TestConcurrentAcidInserts::()::test_concurrent_inserts[unique_database0];
-- connecting to: localhost:21002
-- connecting to: localhost:21000
insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (0, 0);

-- executing against localhost:21001

-- connecting to: localhost:21002
-- executing against localhost:21002

-- executing against localhost:21000

-- connecting to: localhost:21001
-- executing against localhost:21001

insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (1, 0);

-- executing against localhost:21002

insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (2, 0);

-- executing against localhost:21000

insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (3, 0);

-- executing against localhost:21002

insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (4, 0);

-- executing against localhost:21001

insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (5, 0);

select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;

select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;

select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;

-- 2022-03-16 09:21:07,824 INFO     Thread-3: Started query 
6f4e9d38d1723252:a6a7368f00000000
-- 2022-03-16 09:21:07,859 INFO     Thread-4: Started query 
33423e0145b7d3e0:ea8d626200000000
-- 2022-03-16 09:21:07,861 INFO     Thread-8: Started query 
4046d8edde82931b:aa11c84800000000
-- 2022-03-16 09:21:07,875 INFO     Thread-9: Started query 
594d63b7814c31ab:8f2b92ca00000000
-- executing against localhost:21002

insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (2, 1);

-- 2022-03-16 09:21:08,229 INFO     Thread-4: Started query 
a94dab601c125bb4:8988934300000000
-- executing against localhost:21000

select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;

-- executing against localhost:21002

insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (2, 2);

-- 2022-03-16 09:21:08,384 INFO     Thread-3: Started query 
4040597eba7beb36:4cfa08c800000000
-- 2022-03-16 09:21:08,409 INFO     Thread-4: Started query 
854e1aee63575861:dbf8bafb00000000
-- executing against localhost:21002

select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;

-- executing against localhost:21001

select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;

-- 2022-03-16 09:21:08,435 INFO     Thread-9: Started query 
7e4d4cf54a44cf03:868efcb100000000
-- 2022-03-16 09:21:08,456 INFO     Thread-8: Started query 
4645788cf2a4d401:0aea969e00000000
-- executing against localhost:21002

insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (2, 3);

-- 2022-03-16 09:21:08,586 INFO     Thread-4: Started query 
4e4cdd976dfa3358:6c455b7300000000
-- executing against localhost:21002

insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (2, 4);

-- 2022-03-16 09:21:08,711 INFO     Thread-4: Started query 
6f46942eb8807e5f:ffbb146000000000
-- executing against localhost:21002

insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (2, 5);

-- executing against localhost:21000

select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;

-- 2022-03-16 09:21:08,959 INFO     Thread-3: Started query 
d940fa405a776bef:3705218900000000
-- 2022-03-16 09:21:08,977 INFO     Thread-4: Started query 
f947061a7c45901c:12b2ba9d00000000
-- executing against localhost:21001

select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;

-- 2022-03-16 09:21:08,997 INFO     Thread-9: Started query 
f54a0aa01bbb2f86:3b89ae9600000000
-- executing against localhost:21002

select * from test_concurrent_inserts_8933345c.test_concurrent_inserts;

-- 2022-03-16 09:21:09,147 INFO     Thread-8: Started query 
fa418523bc0552ce:f0e49b1a00000000
-- executing against localhost:21002

insert into table test_concurrent_inserts_8933345c.test_concurrent_inserts 
values (2, 6);

-- 2022-03-16 09:21:09,250 INFO     Thread-4: Started query 
fc4751d49d1a1aa2:4bcb48d600000000
-- closing connection to: localhost:21002
Traceback (most recent call last):
  File "/home/qchen/Impala.03112022/tests/stress/stress_util.py", line 35, in 
run
    return self.func(*self.args, **self.kwargs)
  File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line 
276, in _impala_role_concurrent_checker
    verify_result_set(result)
  File "/home/qchen/Impala.03112022/tests/stress/test_acid_stress.py", line 
269, in verify_result_set
    assert sorted_run == range(sorted_run[0], sorted_run[-1] + 1), "wid: %d" % 
wid
AssertionError: wid: 2
assert [0, 1, 2, 2, 3, 4] == [0, 1, 2, 3, 4]
  At index 3 diff: 2 != 3
  Left contains more items, first extra item: 4
  Full diff:
  - [0, 1, 2, 2, 3, 4]
  ?           ---
  + [0, 1, 2, 3, 4]
========================================== 1 failed, 1 passed in 158.88 seconds 
==========================================
[09:23:33 qchen@qifan-10229: Impala.03112022] git branch
  IMPALA-10992-auto-scaling-planner-support
* master

[09:23:41 qchen@qifan-10229: Impala.03112022] git diff
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index a921bb961..6df592f3f 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1644,6 +1644,7 @@ public class Frontend {
         markTimelineRetries(attempt, retryMsg, timeline);
         return req;
       } catch (InconsistentMetadataFetchException e) {
+        LOG.error("BAR: catch an InconsistentMetadataFetchException");
         if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) {
           markTimelineRetries(attempt, e.getMessage(), timeline);
           throw e;
@@ -1819,14 +1820,18 @@ public class Frontend {
     } catch (Exception e) {
       if (queryCtx.isSetTransaction_id()) {
         try {
+          LOG.error("BAR: to abort Transaction");
           abortTransaction(queryCtx.getTransaction_id());
+          LOG.error("BAR: Transaction aborted");
           timeline.markEvent("Transaction aborted");
         } catch (TransactionException te) {
           LOG.error("Could not abort transaction because: " + te.getMessage());
         }
       } else if (queryCtx.isIs_kudu_transactional()) {
         try {
+          LOG.error("BAR: to abort kudu Transaction");
           abortKuduTransaction(queryCtx.getQuery_id());
+          LOG.error("BAR: kudu Transaction aborted");
           timeline.markEvent(
               "Kudu transaction aborted: " + 
queryCtx.getQuery_id().toString());
         } catch (TransactionException te) {
diff --git a/tests/stress/test_acid_stress.py b/tests/stress/test_acid_stress.py
index f6439ff3c..09ec874e7 100644
--- a/tests/stress/test_acid_stress.py
+++ b/tests/stress/test_acid_stress.py
@@ -46,188 +46,188 @@ class TestAcidStress(ImpalaTestSuite):
                    v.get_value('table_format').compression_codec == 'none'))
 
 
-class TestAcidInsertsBasic(TestAcidStress):
-  @classmethod
-  def get_workload(self):
-    return super(TestAcidInsertsBasic, self).get_workload()
-
-  @classmethod
-  def add_test_dimensions(cls):
-    super(TestAcidInsertsBasic, cls).add_test_dimensions()
-
-  def _verify_result(self, result, expected_result):
-    """Verify invariants for 'run' and 'i'."""
-    assert len(result.data) > 0
-    run_max = -1
-    i_list = []
-    for line in result.data:
-      [run, i] = map(int, (line.split('\t')))
-      run_max = max(run_max, run)
-      i_list.append(i)
-    assert expected_result["run"] <= run_max  # shouldn't see data overwritten 
in the past
-    i_list.sort()
-    if expected_result["run"] < run_max:
-      expected_result["run"] = run_max
-      expected_result["i"] = 0
-      return
-    assert i_list[-1] >= expected_result["i"]
-    assert i_list == range(i_list[-1] + 1)  # 'i' should have all values from 
0 to max_i
-    expected_result["i"] = i_list[-1]
-
-  def _hive_role_write_inserts(self, tbl_name, partitioned):
-    """INSERT INTO/OVERWRITE a table several times from Hive."""
-    part_expr = "partition (p=1)" if partitioned else ""
-    for run in xrange(0, NUM_OVERWRITES):
-      OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
-          """ % (tbl_name, part_expr, run, 0)
-      self.run_stmt_in_hive(OVERWRITE_SQL)
-      for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
-        INSERT_SQL = """insert into table %s %s values (%i, %i)
-            """ % (tbl_name, part_expr, run, i)
-        self.run_stmt_in_hive(INSERT_SQL)
-
-  def _impala_role_write_inserts(self, tbl_name, partitioned):
-    """INSERT INTO/OVERWRITE a table several times from Impala."""
-    try:
-      impalad_client = ImpalaTestSuite.create_impala_client()
-      part_expr = "partition (p=1)" if partitioned else ""
-      for run in xrange(0, NUM_OVERWRITES + 1):
-        OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
-            """ % (tbl_name, part_expr, run, 0)
-        impalad_client.execute(OVERWRITE_SQL)
-        for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
-          INSERT_SQL = """insert into table %s %s values (%i, %i)
-              """ % (tbl_name, part_expr, run, i)
-          impalad_client.execute(INSERT_SQL)
-    finally:
-      impalad_client.close()
-
-  def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds):
-    """SELECT from a table many times until the expected final values are 
found."""
-    try:
-      impalad_client = ImpalaTestSuite.create_impala_client()
-      expected_result = {"run": -1, "i": 0}
-      accept_empty_table = True
-      while expected_result["run"] != NUM_OVERWRITES and \
-          expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
-        time.sleep(sleep_seconds)
-        if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
-        result = impalad_client.execute("select run, i from %s" % tbl_name)
-        if len(result.data) == 0:
-          assert accept_empty_table
-          continue
-        accept_empty_table = False
-        self._verify_result(result, expected_result)
-    finally:
-      impalad_client.close()
-
-  def _create_table(self, full_tbl_name, partitioned):
-    """Creates test table with name 'full_tbl_name'. Table is partitioned if
-    'partitioned' is set to True."""
-    part_expr = "partitioned by (p int)" if partitioned else ""
-
-    CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
-         'transactional_properties' = 'insert_only', 'transactional' = 'true')
-         """ % (full_tbl_name, part_expr)
-    self.client.execute("drop table if exists %s" % full_tbl_name)
-    self.client.execute(CREATE_SQL)
-
-  def _run_test_read_hive_inserts(self, unique_database, partitioned):
-    """Check that Impala can read a single insert only ACID table 
(over)written by Hive
-    several times. Consistency can be checked by using incremental values for
-    overwrites ('run') and inserts ('i').
-    """
-    tbl_name = "%s.test_read_hive_inserts" % unique_database
-    self._create_table(tbl_name, partitioned)
-
-    run_tasks([
-        Task(self._hive_role_write_inserts, tbl_name, partitioned),
-        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
-           sleep_seconds=3)])
-
-  def _run_test_read_impala_inserts(self, unique_database, partitioned):
-    """Check that Impala can read a single insert only ACID table 
(over)written by Hive
-    several times. Consistency can be checked by using incremental values for
-    overwrites ('run') and inserts ('i').
-    """
-    tbl_name = "%s.test_read_impala_inserts" % unique_database
-    self._create_table(tbl_name, partitioned)
-
-    run_tasks([
-        Task(self._impala_role_write_inserts, tbl_name, partitioned),
-        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
-           sleep_seconds=0.1)])
-
-  @SkipIfHive2.acid
-  @SkipIfS3.hive
-  @SkipIfGCS.hive
-  @SkipIfCOS.hive
-  @pytest.mark.execute_serially
-  @pytest.mark.stress
-  def test_read_hive_inserts(self, unique_database):
-    """Check that Impala can read partitioned and non-partitioned ACID tables
-    written by Hive."""
-    for is_partitioned in [False, True]:
-      self._run_test_read_hive_inserts(unique_database, is_partitioned)
-
-  @SkipIfHive2.acid
-  @pytest.mark.execute_serially
-  @pytest.mark.stress
-  def test_read_impala_inserts(self, unique_database):
-    """Check that Impala can read partitioned and non-partitioned ACID tables
-    written by Hive."""
-    for is_partitioned in [False, True]:
-      self._run_test_read_impala_inserts(unique_database, is_partitioned)
-
-  def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite, 
sleep_sec):
-    insert_op = "OVERWRITE" if is_overwrite else "INTO"
-    try:
-      impalad_client = ImpalaTestSuite.create_impala_client()
-      impalad_client.execute(
-          """insert {op} table {tbl_name} partition({partition})
-             select sleep({sleep_ms})""".format(op=insert_op, 
tbl_name=tbl_name,
-          partition=partition, sleep_ms=sleep_sec * 1000))
-    finally:
-      impalad_client.close()
-
-  @pytest.mark.execute_serially
-  @pytest.mark.stress
-  @SkipIf.not_hdfs
-  @UniqueDatabase.parametrize(sync_ddl=True)
-  def test_partitioned_inserts(self, unique_database):
-    """Check that the different ACID write operations take appropriate locks.
-       INSERT INTO: should take a shared lock
-       INSERT OVERWRITE: should take an exclusive lock
-       Both should take PARTITION-level lock in case of static partition 
insert."""
-    tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
-    self.client.set_configuration_option("SYNC_DDL", "true")
-    self.client.execute("""
-        CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
-        TBLPROPERTIES(
-        
'transactional_properties'='insert_only','transactional'='true')""".format(
-        tbl_name))
-    # Warmup INSERT
-    self.execute_query("alter table {0} add 
partition(p=0,q=0)".format(tbl_name))
-    sleep_sec = 5
-    task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
-        "p=1,q=1", False, sleep_sec)
-    # INSERT INTO the same partition can run in parallel.
-    duration = run_tasks([task_insert_into, task_insert_into])
-    assert duration < 3 * sleep_sec
-    task_insert_overwrite = Task(self._impala_role_partition_writer, tbl_name,
-      "p=1,q=1", True, sleep_sec)
-    # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
-    duration = run_tasks([task_insert_into, task_insert_overwrite])
-    assert duration > 4 * sleep_sec
-    # INSERT OVERWRITEs to the same partition should have mutual exclusion.
-    duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
-    assert duration > 4 * sleep_sec
-    task_insert_overwrite_2 = Task(self._impala_role_partition_writer, 
tbl_name,
-      "p=1,q=2", True, sleep_sec)
-    # INSERT OVERWRITEs to different partitions can run in parallel.
-    duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
-    assert duration < 3 * sleep_sec
-
+#class TestAcidInsertsBasic(TestAcidStress):
+#  @classmethod
+#  def get_workload(self):
+#    return super(TestAcidInsertsBasic, self).get_workload()
+#
+#  @classmethod
+#  def add_test_dimensions(cls):
+#    super(TestAcidInsertsBasic, cls).add_test_dimensions()
+#
+#  def _verify_result(self, result, expected_result):
+#    """Verify invariants for 'run' and 'i'."""
+#    assert len(result.data) > 0
+#    run_max = -1
+#    i_list = []
+#    for line in result.data:
+#      [run, i] = map(int, (line.split('\t')))
+#      run_max = max(run_max, run)
+#      i_list.append(i)
+#    assert expected_result["run"] <= run_max  # shouldn't see data 
overwritten in the past
+#    i_list.sort()
+#    if expected_result["run"] < run_max:
+#      expected_result["run"] = run_max
+#      expected_result["i"] = 0
+#      return
+#    assert i_list[-1] >= expected_result["i"]
+#    assert i_list == range(i_list[-1] + 1)  # 'i' should have all values from 
0 to max_i
+#    expected_result["i"] = i_list[-1]
+#
+#  def _hive_role_write_inserts(self, tbl_name, partitioned):
+#    """INSERT INTO/OVERWRITE a table several times from Hive."""
+#    part_expr = "partition (p=1)" if partitioned else ""
+#    for run in xrange(0, NUM_OVERWRITES):
+#      OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
+#          """ % (tbl_name, part_expr, run, 0)
+#      self.run_stmt_in_hive(OVERWRITE_SQL)
+#      for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
+#        INSERT_SQL = """insert into table %s %s values (%i, %i)
+#            """ % (tbl_name, part_expr, run, i)
+#        self.run_stmt_in_hive(INSERT_SQL)
+#
+#  def _impala_role_write_inserts(self, tbl_name, partitioned):
+#    """INSERT INTO/OVERWRITE a table several times from Impala."""
+#    try:
+#      impalad_client = ImpalaTestSuite.create_impala_client()
+#      part_expr = "partition (p=1)" if partitioned else ""
+#      for run in xrange(0, NUM_OVERWRITES + 1):
+#        OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
+#            """ % (tbl_name, part_expr, run, 0)
+#        impalad_client.execute(OVERWRITE_SQL)
+#        for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1):
+#          INSERT_SQL = """insert into table %s %s values (%i, %i)
+#              """ % (tbl_name, part_expr, run, i)
+#          impalad_client.execute(INSERT_SQL)
+#    finally:
+#      impalad_client.close()
+#
+#  def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds):
+#    """SELECT from a table many times until the expected final values are 
found."""
+#    try:
+#      impalad_client = ImpalaTestSuite.create_impala_client()
+#      expected_result = {"run": -1, "i": 0}
+#      accept_empty_table = True
+#      while expected_result["run"] != NUM_OVERWRITES and \
+#          expected_result["i"] != NUM_INSERTS_PER_OVERWRITE:
+#        time.sleep(sleep_seconds)
+#        if needs_refresh: impalad_client.execute("refresh %s" % tbl_name)
+#        result = impalad_client.execute("select run, i from %s" % tbl_name)
+#        if len(result.data) == 0:
+#          assert accept_empty_table
+#          continue
+#        accept_empty_table = False
+#        self._verify_result(result, expected_result)
+#    finally:
+#      impalad_client.close()
+#
+#  def _create_table(self, full_tbl_name, partitioned):
+#    """Creates test table with name 'full_tbl_name'. Table is partitioned if
+#    'partitioned' is set to True."""
+#    part_expr = "partitioned by (p int)" if partitioned else ""
+#
+#    CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES (
+#         'transactional_properties' = 'insert_only', 'transactional' = 'true')
+#         """ % (full_tbl_name, part_expr)
+#    self.client.execute("drop table if exists %s" % full_tbl_name)
+#    self.client.execute(CREATE_SQL)
+#
+#  def _run_test_read_hive_inserts(self, unique_database, partitioned):
+#    """Check that Impala can read a single insert only ACID table 
(over)written by Hive
+#    several times. Consistency can be checked by using incremental values for
+#    overwrites ('run') and inserts ('i').
+#    """
+#    tbl_name = "%s.test_read_hive_inserts" % unique_database
+#    self._create_table(tbl_name, partitioned)
+#
+#    run_tasks([
+#        Task(self._hive_role_write_inserts, tbl_name, partitioned),
+#        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True,
+#           sleep_seconds=3)])
+#
+#  def _run_test_read_impala_inserts(self, unique_database, partitioned):
+#    """Check that Impala can read a single insert only ACID table 
(over)written by Hive
+#    several times. Consistency can be checked by using incremental values for
+#    overwrites ('run') and inserts ('i').
+#    """
+#    tbl_name = "%s.test_read_impala_inserts" % unique_database
+#    self._create_table(tbl_name, partitioned)
+#
+#    run_tasks([
+#        Task(self._impala_role_write_inserts, tbl_name, partitioned),
+#        Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False,
+#           sleep_seconds=0.1)])
+#
+#  @SkipIfHive2.acid
+#  @SkipIfS3.hive
+#  @SkipIfGCS.hive
+#  @SkipIfCOS.hive
+#  @pytest.mark.execute_serially
+#  @pytest.mark.stress
+#  def test_read_hive_inserts(self, unique_database):
+#    """Check that Impala can read partitioned and non-partitioned ACID tables
+#    written by Hive."""
+#    for is_partitioned in [False, True]:
+#      self._run_test_read_hive_inserts(unique_database, is_partitioned)
+#
+#  @SkipIfHive2.acid
+#  @pytest.mark.execute_serially
+#  @pytest.mark.stress
+#  def test_read_impala_inserts(self, unique_database):
+#    """Check that Impala can read partitioned and non-partitioned ACID tables
+#    written by Hive."""
+#    for is_partitioned in [False, True]:
+#      self._run_test_read_impala_inserts(unique_database, is_partitioned)
+#
+#  def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite, 
sleep_sec):
+#    insert_op = "OVERWRITE" if is_overwrite else "INTO"
+#    try:
+#      impalad_client = ImpalaTestSuite.create_impala_client()
+#      impalad_client.execute(
+#          """insert {op} table {tbl_name} partition({partition})
+#             select sleep({sleep_ms})""".format(op=insert_op, 
tbl_name=tbl_name,
+#          partition=partition, sleep_ms=sleep_sec * 1000))
+#    finally:
+#      impalad_client.close()
+#
+#  @pytest.mark.execute_serially
+#  @pytest.mark.stress
+#  @SkipIf.not_hdfs
+#  @UniqueDatabase.parametrize(sync_ddl=True)
+#  def test_partitioned_inserts(self, unique_database):
+#    """Check that the different ACID write operations take appropriate locks.
+#       INSERT INTO: should take a shared lock
+#       INSERT OVERWRITE: should take an exclusive lock
+#       Both should take PARTITION-level lock in case of static partition 
insert."""
+#    tbl_name = "%s.test_concurrent_partitioned_inserts" % unique_database
+#    self.client.set_configuration_option("SYNC_DDL", "true")
+#    self.client.execute("""
+#        CREATE TABLE {0} (i int) PARTITIONED BY (p INT, q INT)
+#        TBLPROPERTIES(
+#        
'transactional_properties'='insert_only','transactional'='true')""".format(
+#        tbl_name))
+#    # Warmup INSERT
+#    self.execute_query("alter table {0} add 
partition(p=0,q=0)".format(tbl_name))
+#    sleep_sec = 5
+#    task_insert_into = Task(self._impala_role_partition_writer, tbl_name,
+#        "p=1,q=1", False, sleep_sec)
+#    # INSERT INTO the same partition can run in parallel.
+#    duration = run_tasks([task_insert_into, task_insert_into])
+#    assert duration < 3 * sleep_sec
+#    task_insert_overwrite = Task(self._impala_role_partition_writer, tbl_name,
+#      "p=1,q=1", True, sleep_sec)
+#    # INSERT INTO + INSERT OVERWRITE should have mutual exclusion.
+#    duration = run_tasks([task_insert_into, task_insert_overwrite])
+#    assert duration > 4 * sleep_sec
+#    # INSERT OVERWRITEs to the same partition should have mutual exclusion.
+#    duration = run_tasks([task_insert_overwrite, task_insert_overwrite])
+#    assert duration > 4 * sleep_sec
+#    task_insert_overwrite_2 = Task(self._impala_role_partition_writer, 
tbl_name,
+#      "p=1,q=2", True, sleep_sec)
+#    # INSERT OVERWRITEs to different partitions can run in parallel.
+#    duration = run_tasks([task_insert_overwrite, task_insert_overwrite_2])
+#    assert duration < 3 * sleep_sec
+#
 
 class TestConcurrentAcidInserts(TestAcidStress):
   @classmethod
(END)

{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org
For additional commands, e-mail: issues-all-h...@impala.apache.org

Reply via email to