This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit f98da3315e1e4744ad0e49405a4d1c7f98be85ae Author: Sai Hemanth Gantasala <saihema...@cloudera.com> AuthorDate: Mon May 6 19:27:50 2024 -0700 IMPALA-12712: Invalidate metadata on table should set better createEventId "INVALIDATE METADATA <table>" can be used to bring up a table in Impala's catalog cache if the table exists in HMS. Currently, createEventId for such tables are always set as -1 which will lead to always removing the table. Sequence of drop table + create table + invalidate table can lead to flaky test failures like IMPALA-12266. Solution: When Invalidate metadata <table> is fired, fetch the latest eventId from HMS and set it as createEventId for the table, so that drop table event that happend before invalidate query will be ignored without removing the table from cache. Note: Also removed an unnecessary RPC call to HMS to get table object since we alrady have required info in table metadata rpc call. Testing: - Added an end-to-end test to verify that drop table event happened before time shouldn't remove the metadata object from cache. Change-Id: Iff6ac18fe8d9e7b25cc41c7e41eecde251fbccdd Reviewed-on: http://gerrit.cloudera.org:8080/21402 Reviewed-by: Csaba Ringhofer <csringho...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- .../org/apache/impala/catalog/CatalogServiceCatalog.java | 13 ++++++++----- .../org/apache/impala/catalog/events/MetastoreEvents.java | 6 +++--- .../java/org/apache/impala/service/CatalogOpExecutor.java | 11 ++++++++--- tests/custom_cluster/test_events_custom_configs.py | 15 +++++++++++++++ 4 files changed, 34 insertions(+), 11 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 666eb8b4a..65a0c564b 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -2906,6 +2906,12 @@ public class CatalogServiceCatalog extends Catalog { return hdfsTable; } + public TCatalogObject invalidateTable(TTableName tableName, + Reference<Boolean> tblWasRemoved, Reference<Boolean> dbWasAdded, + EventSequence catalogTimeline) { + return invalidateTable(tableName, tblWasRemoved, dbWasAdded, catalogTimeline, -1L); + } + /** * Invalidates the table in the catalog cache, potentially adding/removing the table * from the cache based on whether it exists in the Hive Metastore. @@ -2927,7 +2933,7 @@ public class CatalogServiceCatalog extends Catalog { */ public TCatalogObject invalidateTable(TTableName tableName, Reference<Boolean> tblWasRemoved, Reference<Boolean> dbWasAdded, - EventSequence catalogTimeline) { + EventSequence catalogTimeline, long eventId) { tblWasRemoved.setRef(false); dbWasAdded.setRef(false); String dbName = tableName.getDb_name(); @@ -2943,13 +2949,11 @@ public class CatalogServiceCatalog extends Catalog { // 2) Empty - Table does not exist in metastore. // 3) unknown (null) - There was exception thrown by the metastore client. List<TableMeta> metaRes = null; - org.apache.hadoop.hive.metastore.api.Table msTbl = null; Db db = null; try (MetaStoreClient msClient = getMetaStoreClient(catalogTimeline)) { org.apache.hadoop.hive.metastore.api.Database msDb = null; try { metaRes = getTableMetaFromHive(msClient, dbName, tblName); - msTbl = msClient.getHiveClient().getTable(dbName, tblName); catalogTimeline.markEvent(FETCHED_HMS_TABLE); } catch (UnknownDBException | NoSuchObjectException e) { // The parent database does not exist in the metastore. Treat this the same @@ -3002,10 +3006,9 @@ public class CatalogServiceCatalog extends Catalog { // Add a new uninitialized table to the table cache, effectively invalidating // any existing entry. The metadata for the table will be loaded lazily, on the // on the next access to the table. - Preconditions.checkNotNull(msTbl); Table newTable = addIncompleteTable(dbName, tblName, MetastoreShim.mapToInternalTableType(tblMeta.getTableType()), - tblMeta.getComments()); + tblMeta.getComments(), eventId); Preconditions.checkNotNull(newTable); if (loadInBackground_) { tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(), diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index ee054e44f..79592cca6 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -1861,8 +1861,8 @@ public class MetastoreEvents { } else if (tbl instanceof IncompleteTable) { // No-Op } else if (getEventId() > tbl.getCreateEventId()) { - catalog_.invalidateTable(tbl.getTableName().toThrift(), - new Reference<>(), new Reference<>(), NoOpEventSequence.INSTANCE); + catalog_.invalidateTable(tbl.getTableName().toThrift(), new Reference<>(), + new Reference<>(), NoOpEventSequence.INSTANCE, getEventId()); LOG.info("Table " + tbl.getFullName() + " is invalidated from catalog cache" + " since eventSync is turned on for this table."); } else { @@ -3151,7 +3151,7 @@ public class MetastoreEvents { return ; } catalog_.invalidateTable(tbl.getTableName().toThrift(), - tblWasRemoved, dbWasAdded, NoOpEventSequence.INSTANCE); + tblWasRemoved, dbWasAdded, NoOpEventSequence.INSTANCE, getEventId()); LOG.info("Table " + tbl.getFullName() + " is invalidated from catalog cache"); } } diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 17210b657..47f5b07a6 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -7003,11 +7003,16 @@ public class CatalogOpExecutor { // Result table of the invalidate/refresh operation. Table tbl = null; TableName tblName = TableName.fromThrift(req.getTable_name()); + long eventId = -1L; + try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) { + eventId = MetastoreEventsProcessor.getCurrentEventIdNoThrow( + msClient.getHiveClient()); + } if (!req.isIs_refresh()) { // For INVALIDATE METADATA <db>.<table>, the db might be unloaded. // So we can't update 'tbl' here. updatedThriftTable = catalog_.invalidateTable( - req.getTable_name(), tblWasRemoved, dbWasAdded, catalogTimeline); + req.getTable_name(), tblWasRemoved, dbWasAdded, catalogTimeline, eventId); catalogTimeline.markEvent("Invalidated table in catalog cache"); } else { // Quick check to see if the table exists in the catalog without triggering @@ -7045,8 +7050,8 @@ public class CatalogOpExecutor { updatedThriftTable = catalog_.reloadTable(tbl, req, resultType, cmdString, /*eventId*/ -1, catalogTimeline); } catch (IcebergTableLoadingException e) { - updatedThriftTable = catalog_.invalidateTable( - req.getTable_name(), tblWasRemoved, dbWasAdded, catalogTimeline); + updatedThriftTable = catalog_.invalidateTable(req.getTable_name(), + tblWasRemoved, dbWasAdded, catalogTimeline, eventId); } } } else { diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index 03ea3cac6..941bd31e7 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -1261,6 +1261,21 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): results = self.client.execute("select i from " + fq_tbl) assert results.data == ["1", "2", "3"] + @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5") + def test_invalidate_better_create_event_id(self, unique_database): + """This test should set better create event id for invalidate table""" + test_tbl = "test_invalidate_table" + self.client.execute("create table {}.{} (i int)".format(unique_database, test_tbl)) + EventProcessorUtils.wait_for_event_processing(self) + tables_removed_before = EventProcessorUtils.get_int_metric("tables-removed") + self.client.execute("drop table {}.{}".format(unique_database, test_tbl)) + self.run_stmt_in_hive( + "create table {}.{} (i int, j int)".format(unique_database, test_tbl)) + self.client.execute("invalidate metadata {}.{}".format(unique_database, test_tbl)) + EventProcessorUtils.wait_for_event_processing(self) + tables_removed_after = EventProcessorUtils.get_int_metric("tables-removed") + assert tables_removed_after == tables_removed_before + @SkipIfFS.hive class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):