Quanlong Huang has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/23942 )

Change subject: IMPALA-14230: Add catch-up mode for event processing
......................................................................


Patch Set 6:

(6 comments)

http://gerrit.cloudera.org:8080/#/c/23942/6/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
File fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java:

http://gerrit.cloudera.org:8080/#/c/23942/6/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@956
PS6, Line 956:       Set<TableName> tableNames =
             :           tableWriteIds_.stream()
             :               .map(writeId -> new TableName(writeId.getDbName(), 
writeId.getTblName()))
             :               .collect(Collectors.toSet());
I think we can do this in the constructor and initialize tableNames_ there. 
Then remove other updates on tableNames_ and use tableNames_ in this method.


http://gerrit.cloudera.org:8080/#/c/23942/6/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
File fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java:

http://gerrit.cloudera.org:8080/#/c/23942/6/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java@3427
PS6, Line 3427: skipIfIncomplete
nit: let's replace "incomplete" with "unloaded" or "invalidated" since they are 
used more commonly in our code base.


http://gerrit.cloudera.org:8080/#/c/23942/1/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java:

http://gerrit.cloudera.org:8080/#/c/23942/1/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@1205
PS1, Line 1205:     DerivedMetastoreTableEvent(DerivedMetastoreEventContext 
context,
> Done
Not done yet. Let's move this method after/below the constructor.


http://gerrit.cloudera.org:8080/#/c/23942/6/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java:

http://gerrit.cloudera.org:8080/#/c/23942/6/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@1087
PS6, Line 1087:         invalidateTablesInCatchUpMode(lag);
I think we can put the invalidate code here and just let subclass implement a 
method getTableNames() to return a set of table names.

        for (TableName tableName : getTableNames()) {
          if (catalog_.invalidateTableIfExists(
                  tableName.getDb(), tableName.getTbl(), true /* 
skipIfIncomplete */)
              != null) {
            infoLog("Invalidated table {}.{} due to event lag of {}s", 
tableName.getDb(),
                tableName.getTbl(), lag);
          }
        }


http://gerrit.cloudera.org:8080/#/c/23942/5/tests/custom_cluster/test_events_custom_configs.py
File tests/custom_cluster/test_events_custom_configs.py:

http://gerrit.cloudera.org:8080/#/c/23942/5/tests/custom_cluster/test_events_custom_configs.py@680
PS5, Line 680:       impalad_args="--use_local_catalog=false",
> Added a new class TestEventProcessingCatchupMode. Because COMMIT_TXN is a c
It might be hard if we verify the count of expected logs. We can verify the 
"events-skipped" metric instead.


http://gerrit.cloudera.org:8080/#/c/23942/6/tests/custom_cluster/test_events_custom_configs.py
File tests/custom_cluster/test_events_custom_configs.py:

http://gerrit.cloudera.org:8080/#/c/23942/6/tests/custom_cluster/test_events_custom_configs.py@2101
PS6, Line 2101: alter table events
Though we are using ALTER TABLE statements, the HMS events in this test are 
actually ADD_PARTITION events. We need another test for ALTER_TABLE / 
ALTER_PARTITION events. A common scenario is ALTER events from COMPUTE STATS / 
COMPUTE INCREMENTAL STATS. We can also set 
debug_action=catalogd_update_stats_delay:SLEEP@2000 to let them run slow so 
holding the table lock for a long time. Previously this disrupts self-event 
detection.

We can also add a test for Iceberg ingestion scenario that generates 
ALTER_TABLE events.

impala> create table my_ice_tbl(i int, s string, p int) partitioned by spec(p) 
stored as iceberg;
# Self ALTER_TABLE event
impala> insert into my_ice_tbl values (0, 'aaa', 0);
# make sure the table is loaded and trigger external ALTER_TABLE events
impala> desc my_ice_tbl;
hive> insert into my_ice_tbl values (1, 'bbb', 1); insert into my_ice_tbl 
values (2, 'ccc', 2); insert into my_ice_tbl values (3, 'ddd', 3); insert into 
my_ice_tbl values (4, 'eee', 4); insert into my_ice_tbl values (5, 'eee', 5);

Verify that the events are processed quickly, e.g. after the last Hive 
statement finishes, there are no lag in event processing.



--
To view, visit http://gerrit.cloudera.org:8080/23942
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ib906c06346d5d3159999eeac632e1318bc060065
Gerrit-Change-Number: 23942
Gerrit-PatchSet: 6
Gerrit-Owner: Yida Wu <[email protected]>
Gerrit-Reviewer: Anonymous Coward <[email protected]>
Gerrit-Reviewer: Csaba Ringhofer <[email protected]>
Gerrit-Reviewer: Impala Public Jenkins <[email protected]>
Gerrit-Reviewer: Michael Smith <[email protected]>
Gerrit-Reviewer: Quanlong Huang <[email protected]>
Gerrit-Reviewer: Sai Hemanth Gantasala <[email protected]>
Gerrit-Reviewer: Yida Wu <[email protected]>
Gerrit-Comment-Date: Wed, 25 Feb 2026 07:25:43 +0000
Gerrit-HasComments: Yes

Reply via email to