Quanlong Huang created IMPALA-12831:
---------------------------------------

             Summary: HdfsTable.toMinimalTCatalogObject() should hold table 
read lock to generate incremental updates
                 Key: IMPALA-12831
                 URL: https://issues.apache.org/jira/browse/IMPALA-12831
             Project: IMPALA
          Issue Type: Bug
          Components: Catalog
            Reporter: Quanlong Huang
            Assignee: Quanlong Huang


When enable_incremental_metadata_updates=true (default), catalogd sends 
incremental partition updates to coordinators, which goes into 
HdfsTable.toMinimalTCatalogObject():
{code:java}
  public TCatalogObject toMinimalTCatalogObject() {
    TCatalogObject catalogObject = super.toMinimalTCatalogObject();
    if (!BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()) {
      return catalogObject;
    }    
    catalogObject.getTable().setTable_type(TTableType.HDFS_TABLE);
    THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
        nullPartitionKeyValue_, nullColumnValue_,
        /*idToPartition=*/ new HashMap<>(),
        /*prototypePartition=*/ new THdfsPartition());
    for (HdfsPartition part : partitionMap_.values()) {
      hdfsTable.partitions.put(part.getId(), part.toMinimalTHdfsPartition());
    }    
    hdfsTable.setHas_full_partitions(false);
    // The minimal catalog object of partitions contain the partition names.
    hdfsTable.setHas_partition_names(true);
    catalogObject.getTable().setHdfs_table(hdfsTable);
    return catalogObject;
  }{code}
 
Accessing table fields without holding the table read lock might be failed by 
concurrent DDLs. We've saw event-processor failed in processing a RELOAD event 
that want to invalidates an HdfsTable:
{noformat}
E0216 16:23:44.283689   253 MetastoreEventsProcessor.java:899] Unexpected 
exception received while processing event
Java exception follows:
java.util.ConcurrentModificationException
        at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
        at java.util.ArrayList$Itr.next(ArrayList.java:861)
        at org.apache.impala.catalog.Column.toColumnNames(Column.java:148)
        at org.apache.impala.catalog.Table.getColumnNames(Table.java:844)
        at 
org.apache.impala.catalog.HdfsTable.toMinimalTCatalogObject(HdfsTable.java:2132)
        at 
org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2221)
        at 
org.apache.impala.catalog.CatalogServiceCatalog.addIncompleteTable(CatalogServiceCatalog.java:2202)
        at 
org.apache.impala.catalog.CatalogServiceCatalog.invalidateTable(CatalogServiceCatalog.java:2797)
        at 
org.apache.impala.catalog.events.MetastoreEvents$ReloadEvent.processTableInvalidate(MetastoreEvents.java:2734)
        at 
org.apache.impala.catalog.events.MetastoreEvents$ReloadEvent.process(MetastoreEvents.java:2656)
        at 
org.apache.impala.catalog.events.MetastoreEvents$MetastoreEvent.processIfEnabled(MetastoreEvents.java:522)
        at 
org.apache.impala.catalog.events.MetastoreEventsProcessor.processEvents(MetastoreEventsProcessor.java:1052)
        at 
org.apache.impala.catalog.events.MetastoreEventsProcessor.processEvents(MetastoreEventsProcessor.java:881)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750){noformat}

I can reproduce the issue using the following test:
{code:python}
  @CustomClusterTestSuite.with_args(
    catalogd_args="--enable_incremental_metadata_updates=true")
  def test_concurrent_invalidate_metadata_with_refresh(self, unique_database):
    # Create a wide table with some partitions
    tbl = unique_database + ".wide_tbl"
    create_stmt = "create table {} (".format(tbl)
    for i in range(600):
      create_stmt += "col{} int, ".format(i)
    create_stmt += "col600 int) partitioned by (p int) stored as textfile"
    self.execute_query(create_stmt)
    for i in range(10):
      self.execute_query("alter table {} add partition (p={})".format(tbl, i))

    refresh_stmt = "refresh " + tbl
    handle = self.client.execute_async(refresh_stmt)
    for i in range(10):
      self.execute_query("invalidate metadata " + tbl)
      # Always keep a concurrent REFRESH statement running
      if self.client.get_state(handle) == self.client.QUERY_STATES['FINISHED']:
        handle = self.client.execute_async(refresh_stmt){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to