This is an automated email from the ASF dual-hosted git repository. tarmstrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 15c3b13e9730479e096275d974000ae9fe8fbb83 Author: Vihang Karajgaonkar <vihan...@apache.org> AuthorDate: Sat Oct 3 17:01:35 2020 -0700 IMPALA-10219: Expose DEBUG_ACTION query option in catalog This patches enables DEBUG_ACTION in the catalog service's java code. Specifically, DEBUG_ACTION query option is now exposed to TResetMetadataRequest and TExecDdlRequest so that we can inject delays while executing refresh or ddl statements. For example, 1. To inject a delay of 100ms per HDFS list operation during refresh statement set the following query option: set debug_action=catalogd_refresh_hdfs_listing_delay:SLEEP@100; 2. To inject a delay of 100ms in alter table recover partitions statement: set debug_action=catalogd_table_recover_delay:SLEEP@100; 3. To inject a delay of 100ms in compute stats statement set debug_action=catalogd_update_stats_delay:SLEEP@100; Note that this option only adds the delay during the update_stats phase of the compute stats execution. Testing: 1. Added a test which sets the query option and makes sure that command takes more time than without query option. 2. Added unit tests for the debugAction implementation logic. Change-Id: Ia7196b1ce76415a5faf3fa8575a26d22b2bf50b1 Reviewed-on: http://gerrit.cloudera.org:8080/16548 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/exec/catalog-op-executor.cc | 1 + be/src/util/debug-util.cc | 4 + common/thrift/CatalogService.thrift | 6 + .../impala/catalog/CatalogServiceCatalog.java | 12 +- .../apache/impala/catalog/FileMetadataLoader.java | 21 +++- .../java/org/apache/impala/catalog/HdfsTable.java | 74 +++++++---- .../org/apache/impala/catalog/IcebergTable.java | 2 +- .../impala/catalog/ParallelFileMetadataLoader.java | 10 +- .../org/apache/impala/common/FileSystemUtil.java | 13 +- .../apache/impala/service/CatalogOpExecutor.java | 26 ++-- .../java/org/apache/impala/service/Frontend.java | 10 ++ .../java/org/apache/impala/util/DebugUtils.java | 137 +++++++++++++++++++++ .../events/MetastoreEventsProcessorTest.java | 2 +- .../org/apache/impala/util/DebugUtilsTest.java | 67 ++++++++++ tests/common/impala_test_suite.py | 11 ++ tests/metadata/test_catalogd_debug_actions.py | 50 ++++++++ 16 files changed, 389 insertions(+), 57 deletions(-) diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc index f22dfad..0ffd708 100644 --- a/be/src/exec/catalog-op-executor.cc +++ b/be/src/exec/catalog-op-executor.cc @@ -132,6 +132,7 @@ Status CatalogOpExecutor::ExecComputeStats( TDdlExecRequest& update_stats_req = catalog_op_req.ddl_params; update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE); update_stats_req.__set_sync_ddl(compute_stats_request.sync_ddl); + update_stats_req.__set_debug_action(compute_stats_request.ddl_params.debug_action); const TComputeStatsParams& compute_stats_params = compute_stats_request.ddl_params.compute_stats_params; diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc index e176852..af10ddd 100644 --- a/be/src/util/debug-util.cc +++ b/be/src/util/debug-util.cc @@ -352,6 +352,10 @@ static bool ParseProbability(const string& prob_str, bool* should_execute) { return true; } +/// The catalog java code also implements a equivalent method for processing the debug +/// actions in the Java code. See DebugUtils.java for more details. Any changes to the +/// implementation logic here like adding a new type of action, should make changes in +/// the DebugUtils.java too. Status DebugActionImpl( const string& debug_action, const char* label, const std::vector<string>& args) { const DebugActionTokens& action_list = TokenizeDebugActions(debug_action); diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 5acdb7f..ad316b9 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -156,6 +156,9 @@ struct TDdlExecRequest { // Parameters for replaying an exported testcase. 25: optional JniCatalog.TCopyTestCaseReq copy_test_case_params + + // Passes the debug actions to catalogd if the query option is set. + 26: optional string debug_action } // Response from executing a TDdlExecRequest @@ -256,6 +259,9 @@ struct TResetMetadataRequest { // If set, refreshes partition objects which are modified externally. // Applicable only when refreshing the table. 9: optional bool refresh_updated_hms_partitions + + // debug_action is set from the query_option when available. + 10: optional string debug_action } // Response from TResetMetadataRequest diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 5c6bf1a..38aed8f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -95,6 +95,7 @@ import org.apache.impala.thrift.TPartitionKeyValue; import org.apache.impala.thrift.TPartitionStats; import org.apache.impala.thrift.TPrincipalType; import org.apache.impala.thrift.TPrivilege; +import org.apache.impala.thrift.TResetMetadataRequest; import org.apache.impala.thrift.TTable; import org.apache.impala.thrift.TTableName; import org.apache.impala.thrift.TTableType; @@ -2275,7 +2276,7 @@ public class CatalogServiceCatalog extends Catalog { * {@code refreshUpdatedPartitions} argument. */ public TCatalogObject reloadTable(Table tbl, String reason) throws CatalogException { - return reloadTable(tbl, false, reason); + return reloadTable(tbl, new TResetMetadataRequest(), reason); } /** @@ -2287,8 +2288,8 @@ public class CatalogServiceCatalog extends Catalog { * If {@code refreshUpdatedParts} is true, the refresh logic detects updated * partitions in metastore and reloads them too. */ - public TCatalogObject reloadTable(Table tbl, boolean refreshUpdatedParts, String reason) - throws CatalogException { + public TCatalogObject reloadTable(Table tbl, TResetMetadataRequest request, + String reason) throws CatalogException { LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName())); Preconditions.checkState(!(tbl instanceof IncompleteTable)); String dbName = tbl.getDb().getName(); @@ -2312,7 +2313,8 @@ public class CatalogServiceCatalog extends Catalog { } if (tbl instanceof HdfsTable) { ((HdfsTable) tbl) - .load(true, msClient.getHiveClient(), msTbl, refreshUpdatedParts, reason); + .load(true, msClient.getHiveClient(), msTbl, + request.refresh_updated_hms_partitions, request.debug_action, reason); } else { tbl.load(true, msClient.getHiveClient(), msTbl, reason); } @@ -3305,7 +3307,7 @@ public class CatalogServiceCatalog extends Catalog { .map(HdfsPartition.Builder::new) .collect(Collectors.toList()); new ParallelFileMetadataLoader( - table, partBuilders, reqWriteIdList, validTxnList, logPrefix).load(); + table, partBuilders, reqWriteIdList, validTxnList, null, logPrefix).load(); for (HdfsPartition.Builder builder : partBuilders) { // Let's retrieve the original partition instance from builder because this is // stored in the keys of 'partToPartialInfoMap'. diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java index 2daca11..c1cba7f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.Reference; @@ -73,6 +72,7 @@ public class FileMetadataLoader { private List<FileDescriptor> loadedInsertDeltaFds_; private List<FileDescriptor> loadedDeleteDeltaFds_; private LoadStats loadStats_; + private String debugAction_; /** * @param partDir the dir for which to fetch file metadata @@ -177,16 +177,17 @@ public class FileMetadataLoader { (oldFdsByRelPath_.isEmpty() || forceRefreshLocations); String msg = String.format("%s file metadata%s from path %s", - oldFdsByRelPath_.isEmpty() ? "Loading" : "Refreshing", - listWithLocations ? " with eager location-fetching" : "", - partDir_); + oldFdsByRelPath_.isEmpty() ? "Loading" : "Refreshing", + listWithLocations ? " with eager location-fetching" : "", partDir_); LOG.trace(msg); try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) { RemoteIterator<? extends FileStatus> fileStatuses; if (listWithLocations) { - fileStatuses = FileSystemUtil.listFiles(fs, partDir_, recursive_); + fileStatuses = FileSystemUtil + .listFiles(fs, partDir_, recursive_, debugAction_); } else { - fileStatuses = FileSystemUtil.listStatus(fs, partDir_, recursive_); + fileStatuses = FileSystemUtil + .listStatus(fs, partDir_, recursive_, debugAction_); // TODO(todd): we could look at the result of listing without locations, and if // we see that a substantial number of the files have changed, it may be better @@ -280,6 +281,14 @@ public class FileMetadataLoader { (fd.getModificationTime() != status.getModificationTime()); } + /** + * Enables injection of a debug actions to introduce delays in HDFS listStatus or + * listFiles call during the file-metadata loading. + */ + public void setDebugAction(String debugAction) { + this.debugAction_ = debugAction; + } + // File/Block metadata loading stats for a single HDFS path. public static class LoadStats { private final Path partDir_; diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 361de4e..75b68df 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -34,6 +34,7 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -87,6 +88,7 @@ import org.apache.impala.thrift.TTableType; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.AvroSchemaConverter; import org.apache.impala.util.AvroSchemaUtils; +import org.apache.impala.util.DebugUtils; import org.apache.impala.util.FsPermissionCache; import org.apache.impala.util.FsPermissionChecker; import org.apache.impala.util.HdfsCachingUtil; @@ -684,6 +686,17 @@ public class HdfsTable extends Table implements FeFsTable { } /** + * Similar to + * {@link #loadFileMetadataForPartitions(IMetaStoreClient, Collection, boolean)} + * but without any injecting the debug actions. + */ + private long loadFileMetadataForPartitions(IMetaStoreClient client, + Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh) + throws CatalogException { + return loadFileMetadataForPartitions(client, partBuilders, isRefresh, null); + } + + /** * Helper method to load the block locations for each partition in 'parts'. * New file descriptor lists are loaded and the partitions are updated in place. * @@ -692,8 +705,8 @@ public class HdfsTable extends Table implements FeFsTable { * @return time in nanoseconds spent in loading file metadata. */ private long loadFileMetadataForPartitions(IMetaStoreClient client, - Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh) - throws CatalogException { + Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh, + String debugActions) throws CatalogException { final Clock clock = Clock.defaultClock(); long startTime = clock.getTick(); @@ -714,9 +727,9 @@ public class HdfsTable extends Table implements FeFsTable { // we'll throw an exception here and end up bailing out of whatever catalog operation // we're in the middle of. This could cause a partial metadata update -- eg we may // have refreshed the top-level table properties without refreshing the files. - new ParallelFileMetadataLoader( - this, partBuilders, validWriteIds_, validTxnList, logPrefix) - .load(); + ParallelFileMetadataLoader loader = new ParallelFileMetadataLoader( + this, partBuilders, validWriteIds_, validTxnList, debugActions, logPrefix); + loader.load(); // TODO(todd): would be good to log a summary of the loading process: // - how many block locations did we reuse/load individually/load via batch @@ -1059,7 +1072,7 @@ public class HdfsTable extends Table implements FeFsTable { throws TableLoadingException { load(reuseMetadata, client, msTbl, /* loadPartitionFileMetadata */ true, /* loadTableSchema*/true, false, - /* partitionsToUpdate*/null, reason); + /* partitionsToUpdate*/null, null, reason); } public void load(boolean reuseMetadata, IMetaStoreClient client, @@ -1067,7 +1080,16 @@ public class HdfsTable extends Table implements FeFsTable { String reason) throws TableLoadingException { load(reuseMetadata, client, msTbl, /* loadPartitionFileMetadata */ true, /* loadTableSchema*/true, refreshUpdatedPartitions, - /* partitionsToUpdate*/null, reason); + /* partitionsToUpdate*/null, null, reason); + } + + public void load(boolean reuseMetadata, IMetaStoreClient hiveClient, + org.apache.hadoop.hive.metastore.api.Table msTbl, + boolean refreshUpdatedPartitions, String debugAction, String reason) + throws CatalogException { + load(reuseMetadata, hiveClient, msTbl, /* loadPartitionFileMetadata */ + true, /* loadTableSchema*/true, refreshUpdatedPartitions, + /* partitionsToUpdate*/null, debugAction, reason); } /** @@ -1097,8 +1119,8 @@ public class HdfsTable extends Table implements FeFsTable { public void load(boolean reuseMetadata, IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl, boolean loadParitionFileMetadata, boolean loadTableSchema, - boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate, String reason) - throws TableLoadingException { + boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate, + @Nullable String debugAction, String reason) throws TableLoadingException { final Timer.Context context = getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time(); String annotation = String.format("%s metadata for %s%s partition(s) of %s.%s (%s)", @@ -1133,14 +1155,15 @@ public class HdfsTable extends Table implements FeFsTable { storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl); if (msTbl.getPartitionKeysSize() == 0) { if (loadParitionFileMetadata) { - storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(client); + storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(client, + debugAction); } else { // Update the single partition stats in case table stats changes. updateUnpartitionedTableStats(); } } else { storageMetadataLoadTime_ += updatePartitionsFromHms( client, partitionsToUpdate, loadParitionFileMetadata, - refreshUpdatedPartitions); + refreshUpdatedPartitions, debugAction); } LOG.info("Incrementally loaded table metadata for: " + getFullName()); } else { @@ -1223,7 +1246,7 @@ public class HdfsTable extends Table implements FeFsTable { * This is optimized for the case where few files have changed. See * {@link FileMetadataLoader#load} for details. */ - private long updateUnpartitionedTableFileMd(IMetaStoreClient client) + private long updateUnpartitionedTableFileMd(IMetaStoreClient client, String debugAction) throws CatalogException { Preconditions.checkState(getNumClusteringCols() == 0); if (LOG.isTraceEnabled()) { @@ -1245,7 +1268,7 @@ public class HdfsTable extends Table implements FeFsTable { // partition instance to local catalog coordinators. partBuilder.setPrevId(oldPartition.getId()); long fileMdLoadTime = loadFileMetadataForPartitions(client, - ImmutableList.of(partBuilder), /*isRefresh=*/true); + ImmutableList.of(partBuilder), /*isRefresh=*/true, debugAction); setUnpartitionedTableStats(partBuilder); addPartition(partBuilder.build()); return fileMdLoadTime; @@ -1277,7 +1300,7 @@ public class HdfsTable extends Table implements FeFsTable { */ private long updatePartitionsFromHms(IMetaStoreClient client, Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata, - boolean refreshUpdatedPartitions) throws Exception { + boolean refreshUpdatedPartitions, String debugAction) throws Exception { if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName()); org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); Preconditions.checkNotNull(msTbl); @@ -1285,9 +1308,9 @@ public class HdfsTable extends Table implements FeFsTable { Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate == null); PartitionDeltaUpdater deltaUpdater = refreshUpdatedPartitions ? new PartBasedDeltaUpdater(client, - loadPartitionFileMetadata, partitionsToUpdate) + loadPartitionFileMetadata, partitionsToUpdate, debugAction) : new PartNameBasedDeltaUpdater(client, loadPartitionFileMetadata, - partitionsToUpdate); + partitionsToUpdate, debugAction); deltaUpdater.apply(); return deltaUpdater.loadTimeForFileMdNs_; } @@ -1310,12 +1333,14 @@ public class HdfsTable extends Table implements FeFsTable { // if loadFileMd_ flag is set, files for these partitions will also be // reloaded. private final Set<String> partitionsToUpdate_; + private final String debugAction_; PartitionDeltaUpdater(IMetaStoreClient client, boolean loadPartitionFileMetadata, - Set<String> partitionsToUpdate) { + Set<String> partitionsToUpdate, String debugAction) { this.client_ = client; this.loadFileMd_ = loadPartitionFileMetadata; this.partitionsToUpdate_ = partitionsToUpdate; + this.debugAction_ = debugAction; } /** @@ -1407,7 +1432,7 @@ public class HdfsTable extends Table implements FeFsTable { .collect(Collectors.toList()); } loadTimeForFileMdNs_ += loadFileMetadataForPartitions(client_, - partitionsToLoadFiles,/* isRefresh=*/true); + partitionsToLoadFiles,/* isRefresh=*/true, debugAction_); updatePartitions(partitionsToLoadFiles); } } @@ -1434,8 +1459,8 @@ public class HdfsTable extends Table implements FeFsTable { public PartBasedDeltaUpdater( IMetaStoreClient client, boolean loadPartitionFileMetadata, - Set<String> partitionsToUpdate) throws Exception { - super(client, loadPartitionFileMetadata, partitionsToUpdate); + Set<String> partitionsToUpdate, String debugAction) throws Exception { + super(client, loadPartitionFileMetadata, partitionsToUpdate, debugAction); Stopwatch sw = Stopwatch.createStarted(); List<Partition> partitionList; if (partitionsToUpdate != null) { @@ -1533,8 +1558,8 @@ public class HdfsTable extends Table implements FeFsTable { public PartNameBasedDeltaUpdater( IMetaStoreClient client, boolean loadPartitionFileMetadata, - Set<String> partitionsToUpdate) throws Exception { - super(client, loadPartitionFileMetadata, partitionsToUpdate); + Set<String> partitionsToUpdate, String debugAction) throws Exception { + super(client, loadPartitionFileMetadata, partitionsToUpdate, debugAction); // Retrieve all the partition names from the Hive Metastore. We need this to // identify the delta between partitions of the local HdfsTable and the table entry // in the Hive Metastore. Note: This is a relatively "cheap" operation @@ -2230,8 +2255,10 @@ public class HdfsTable extends Table implements FeFsTable { * Returns the HDFS paths corresponding to HdfsTable partitions that don't exist in * the Hive Metastore. An HDFS path is represented as a list of strings values, one per * partition key column. + * @param debugAction */ - public List<List<String>> getPathsWithoutPartitions() throws CatalogException { + public List<List<String>> getPathsWithoutPartitions(@Nullable String debugAction) + throws CatalogException { Set<List<LiteralExpr>> existingPartitions = new HashSet<>(); // Get the list of partition values of existing partitions in Hive Metastore. for (HdfsPartition partition: partitionMap_.values()) { @@ -2247,6 +2274,7 @@ public class HdfsTable extends Table implements FeFsTable { try { getAllPartitionsNotInHms(basePath, partitionKeys, existingPartitions, partitionsNotInHms); + DebugUtils.executeDebugAction(debugAction, DebugUtils.RECOVER_PARTITIONS_DELAY); } catch (Exception e) { throw new CatalogException(String.format("Failed to recover partitions for %s " + "with exception:%s.", getFullName(), e)); diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index e134d22..04e1bf9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -232,7 +232,7 @@ public class IcebergTable extends Table implements FeIcebergTable { loadSchemaFromIceberg(); // Loading hdfs table after loaded schema from Iceberg, // in case we create external Iceberg table skipping column info in sql. - hdfsTable_.load(false, msClient, msTable_, true, true, false, null, reason); + hdfsTable_.load(false, msClient, msTable_, true, true, false, null, null, reason); pathHashToFileDescMap_ = Utils.loadAllPartition(this); loadAllColumnStats(msClient); } catch (Exception e) { diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java index 616bb50..8ac4e2d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java @@ -33,19 +33,14 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.impala.catalog.FeFsTable.Utils; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; -import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.Pair; -import org.apache.impala.compat.MetastoreShim; import org.apache.impala.service.BackendConfig; -import org.apache.impala.thrift.TValidWriteIdList; import org.apache.impala.util.ThreadNameAnnotator; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; @@ -76,8 +71,8 @@ public class ParallelFileMetadataLoader { public ParallelFileMetadataLoader(HdfsTable table, Collection<HdfsPartition.Builder> partBuilders, - ValidWriteIdList writeIdList, ValidTxnList validTxnList, String logPrefix) - throws CatalogException { + ValidWriteIdList writeIdList, ValidTxnList validTxnList, String debugAction, + String logPrefix) throws CatalogException { if (writeIdList != null || validTxnList != null) { // make sure that both either both writeIdList and validTxnList are set or both // of them are not. @@ -104,6 +99,7 @@ public class ParallelFileMetadataLoader { boolean hasCachedPartition = Iterables.any(e.getValue(), HdfsPartition.Builder::isMarkedCached); loader.setForceRefreshBlockLocations(hasCachedPartition); + loader.setDebugAction(debugAction); loaders_.put(e.getKey(), loader); } this.logPrefix_ = logPrefix; diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index b4a41b2..ea01c6a 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.impala.catalog.HdfsCompression; +import org.apache.impala.util.DebugUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +54,7 @@ import java.util.UUID; * Common utility functions for operating on FileSystem objects. */ public class FileSystemUtil { + private static final Configuration CONF = new Configuration(); private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtil.class); @@ -564,7 +566,7 @@ public class FileSystemUtil { * Note that the order (breadth-first vs depth-first, sorted vs not) is undefined. */ public static RemoteIterator<? extends FileStatus> listStatus(FileSystem fs, Path p, - boolean recursive) throws IOException { + boolean recursive, String debugAction) throws IOException { try { if (recursive) { // The Hadoop FileSystem API doesn't provide a recursive listStatus call that @@ -584,12 +586,12 @@ public class FileSystemUtil { // even though it returns LocatedFileStatus objects with "fake" blocks which we // will ignore. if (isS3AFileSystem(fs)) { - return listFiles(fs, p, true); + return listFiles(fs, p, true, debugAction); } - + DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY); return new FilterIterator(p, new RecursingIterator(fs, p)); } - + DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY); return new FilterIterator(p, fs.listStatusIterator(p)); } catch (FileNotFoundException e) { if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e); @@ -601,8 +603,9 @@ public class FileSystemUtil { * Wrapper around FileSystem.listFiles(), similar to the listStatus() wrapper above. */ public static RemoteIterator<? extends FileStatus> listFiles(FileSystem fs, Path p, - boolean recursive) throws IOException { + boolean recursive, String debugAction) throws IOException { try { + DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY); return new FilterIterator(p, fs.listFiles(p, recursive)); } catch (FileNotFoundException e) { if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e); diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 7f5aae2..05ff417 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -42,6 +42,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; @@ -197,6 +198,7 @@ import org.apache.impala.thrift.TUpdateCatalogResponse; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.AcidUtils.TblTransaction; import org.apache.impala.util.CompressionUtil; +import org.apache.impala.util.DebugUtils; import org.apache.impala.util.FunctionUtils; import org.apache.impala.util.HdfsCachingUtil; import org.apache.impala.util.IcebergUtil; @@ -358,7 +360,7 @@ public class CatalogOpExecutor { TAlterTableParams alter_table_params = ddlRequest.getAlter_table_params(); tTableName = Optional.of(alter_table_params.getTable_name()); catalogOpMetric_.increment(ddl_type, tTableName); - alterTable(alter_table_params, response); + alterTable(alter_table_params, ddlRequest.getDebug_action(), response); break; case ALTER_VIEW: TCreateOrAlterViewParams alter_view_params = ddlRequest.getAlter_view_params(); @@ -632,7 +634,8 @@ public class CatalogOpExecutor { * table metadata, except for RENAME, ADD PARTITION and DROP PARTITION. This call is * thread-safe, i.e. concurrent operations on the same table are serialized. */ - private void alterTable(TAlterTableParams params, TDdlExecResponse response) + private void alterTable(TAlterTableParams params, @Nullable String debugAction, + TDdlExecResponse response) throws ImpalaException { // When true, loads the file/block metadata. boolean reloadFileMetadata = false; @@ -813,7 +816,7 @@ public class CatalogOpExecutor { Preconditions.checkState(params.isSetUpdate_stats_params()); Reference<Long> numUpdatedColumns = new Reference<>(0L); alterTableUpdateStats(tbl, params.getUpdate_stats_params(), - numUpdatedPartitions, numUpdatedColumns); + numUpdatedPartitions, numUpdatedColumns, debugAction); reloadTableSchema = true; addSummary(response, "Updated " + numUpdatedPartitions.getRef() + " partition(s) and " + numUpdatedColumns.getRef() + " column(s)."); @@ -834,7 +837,7 @@ public class CatalogOpExecutor { } break; case RECOVER_PARTITIONS: - alterTableRecoverPartitions(tbl); + alterTableRecoverPartitions(tbl, debugAction); addSummary(response, "Partitions have been recovered."); break; case SET_OWNER: @@ -942,7 +945,8 @@ public class CatalogOpExecutor { getMetaStoreTable(msClient, tbl); if (tbl instanceof HdfsTable) { ((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl, - reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate, reason); + reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate, null, + reason); } else { tbl.load(true, msClient.getHiveClient(), msTbl, reason); } @@ -1059,7 +1063,8 @@ public class CatalogOpExecutor { * and 'numUpdatedColumns', respectively. */ private void alterTableUpdateStats(Table table, TAlterTableUpdateStatsParams params, - Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns) + Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns, + @Nullable String debugAction) throws ImpalaException { Preconditions.checkState(table.getLock().isHeldByCurrentThread()); Preconditions.checkState(params.isSetTable_stats() || params.isSetColumn_stats()); @@ -1104,6 +1109,7 @@ public class CatalogOpExecutor { throw ex; } } + DebugUtils.executeDebugAction(debugAction, DebugUtils.UPDATE_STATS_DELAY); } private void alterTableUpdateStatsInner(Table table, @@ -3701,13 +3707,15 @@ public class CatalogOpExecutor { * Recover partitions of specified table. * Add partitions to metastore which exist in HDFS but not in metastore. */ - private void alterTableRecoverPartitions(Table tbl) throws ImpalaException { + private void alterTableRecoverPartitions(Table tbl, @Nullable String debugAction) + throws ImpalaException { Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread()); if (!(tbl instanceof HdfsTable)) { throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table"); } HdfsTable hdfsTable = (HdfsTable) tbl; - List<List<String>> partitionsNotInHms = hdfsTable.getPathsWithoutPartitions(); + List<List<String>> partitionsNotInHms = hdfsTable + .getPathsWithoutPartitions(debugAction); if (partitionsNotInHms.isEmpty()) return; List<Partition> hmsPartitions = Lists.newArrayList(); @@ -4327,7 +4335,7 @@ public class CatalogOpExecutor { // 2: If no need for a full table reload then fetch partition level // writeIds and reload only the ones that changed. updatedThriftTable = catalog_ - .reloadTable(tbl, req.refresh_updated_hms_partitions, cmdString); + .reloadTable(tbl, req, cmdString); } } else { // Table was loaded from scratch, so it's already "refreshed". diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 69301f9..3c0f57f 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -667,11 +667,21 @@ public class Frontend { clientRequest.getRedacted_stmt() : clientRequest.getStmt()); ddl.getDdl_params().setHeader(header); ddl.getDdl_params().setSync_ddl(ddl.isSync_ddl()); + // forward debug_actions to the catalogd + if (result.getQuery_options().isSetDebug_action()) { + ddl.getDdl_params() + .setDebug_action(result.getQuery_options().getDebug_action()); + } } if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) { ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl()); ddl.getReset_metadata_params().setRefresh_updated_hms_partitions( result.getQuery_options().isRefresh_updated_hms_partitions()); + // forward debug_actions to the catalogd + if (result.getQuery_options().isSetDebug_action()) { + ddl.getReset_metadata_params() + .setDebug_action(result.getQuery_options().getDebug_action()); + } } } diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java new file mode 100644 index 0000000..3c48940 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.util; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import java.util.List; +import java.util.Random; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is the DebugAction equivalent from the backend (see DebugActionImpl in + * debug-util.cc). This is useful to execute certain debug actions (like Sleep, Jitter) + * which can be executed from the code. The debug actions are passed to the CatalogService + * using a query option (debug_action). + */ +public class DebugUtils { + + private static final Logger LOG = LoggerFactory.getLogger(DebugUtils.class); + private static final Random random = new Random(); + + // debug action label for introducing HDFS listing delay during listFiles or statuses. + public static final String REFRESH_HDFS_LISTING_DELAY + = "catalogd_refresh_hdfs_listing_delay"; + + // debug action label for introducing delay in alter table recover partitions command. + public static final String RECOVER_PARTITIONS_DELAY = "catalogd_table_recover_delay"; + + // debug action label for introducing delay in update stats command. + public static final String UPDATE_STATS_DELAY = "catalogd_update_stats_delay"; + + /** + * Given list of debug actions, execute the debug action pertaining to the given label. + * The debugActions string is of the format specified for the query_option/configuration + * debug_actions. It is generally of the format + * LABEL:ACTION@ACTION_PARAMS|LABEL:ACTION@ACTION_PARAMS. + * For example, if the debug action configuration is: + * CATALOGD_HDFS_LISTING_DELAY:SLEEP@100|CATALOGD_HMS_RPC_DELAY:JITTER@100@0.2 + * Then a when a label "CATALOGD_HDFS_LISTING_DELAY" is provided, this method will sleep + * for 100 milli-seconds. If the label CATALOGD_HMS_RPC_DELAY is provided, this method + * will sleep for a random value between 1-100 milli-seconds with a probability of 0.2. + * + * @param debugActions the debug actions with the format given in the description + * above. + * @param label the label of action which needs to be executed. + */ + public static void executeDebugAction(String debugActions, String label) { + if (Strings.isNullOrEmpty(debugActions)) { + return; + } + List<String> actions = Splitter.on('|').splitToList(debugActions); + for (String action : actions) { + List<String> components = Splitter.on(':').splitToList(action); + if (components.isEmpty()) continue; + if (!components.get(0).equalsIgnoreCase(label)) continue; + // found the debug action for the given label + // get the debug action params + Preconditions.checkState(components.size() > 1, + "Invalid debug action " + action); + List<String> actionParams = Splitter.on('@').splitToList(components.get(1)); + Preconditions.checkState(actionParams.size() > 1, + "Illegal debug action format found in " + debugActions + " for label" + + label); + switch (actionParams.get(0)) { + case "SLEEP": + // the debug action params should be of the format SLEEP@<millis> + Preconditions.checkState(actionParams.size() == 2); + try { + int timeToSleepMs = Integer.parseInt(actionParams.get(1).trim()); + LOG.trace("Sleeping for {} msec to execute debug action {}", + timeToSleepMs, label); + Thread.sleep(timeToSleepMs); + } catch (NumberFormatException ex) { + LOG.error("Invalid number format in debug action {}", action); + } catch (InterruptedException e) { + LOG.warn("Sleep interrupted for the debug action {}", label); + } + break; + case "JITTER": + // the JITTER debug action is of format JITTER@<millis>[@<probability>} + Preconditions.checkState(actionParams.size() <= 3); + try { + int maxTimeToSleepMs = Integer.parseInt(actionParams.get(1).trim()); + boolean shouldExecute = true; + if (actionParams.size() == 3) { + shouldExecute = parseProbability(actionParams.get(2)); + } + if (!shouldExecute) { + continue; + } + long timeToSleepMs = random.nextInt(maxTimeToSleepMs); + LOG.trace("Sleeping for {} msec to execute debug action {}", + timeToSleepMs, action); + Thread.sleep(timeToSleepMs); + } catch (NumberFormatException ex) { + LOG.error("Invalid number format in debug action {}", action); + } catch (InterruptedException ex) { + LOG.warn("Sleep interrupted for the debug action {}", label); + } + break; + default: + LOG.error("Debug action {} is not implemented", actionParams.get(0)); + } + } + } + + + /** + * Parses the probability action parameter of a debug action. + * + * @return true if the action should be executed, else false. + */ + private static boolean parseProbability(String probability) { + double p = Double.parseDouble(probability.trim()); + if (p <= 0 || p > 1.0) { + return false; + } + return random.nextDouble() < p; + } +} diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index 27e1ab1..50e7f85 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -911,7 +911,7 @@ public class MetastoreEventsProcessorTest { } List<String> filesCopied = new ArrayList<>(); RemoteIterator<? extends FileStatus> it = FileSystemUtil - .listStatus(srcFs, src, true); + .listStatus(srcFs, src, true, null); while (it.hasNext()) { FileStatus status = it.next(); if (status.isDirectory()) continue; diff --git a/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java b/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java new file mode 100644 index 0000000..9a19c84 --- /dev/null +++ b/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.util; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the debug actions implementation. + */ +public class DebugUtilsTest { + + @Test + public void testSleepDebugAction() { + DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:SLEEP@1", "test_sleep_action"); + long startTime = System.currentTimeMillis(); + DebugUtils + .executeDebugAction("TEST_SLEEP_ACTION:SLEEP@100|SOME_OTHER_ACTION:SLEEP@10", + "SOME_OTHER_ACTION"); + long endTime = System.currentTimeMillis(); + // make sure you are executing the right sleep action + Assert.assertTrue(endTime - startTime < 100 && endTime - startTime >= 10); + // make sure that code doesn't throw if label is not found + DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:SLEEP@1", "INVALID_LABEL"); + // make sure that code doesn't throw if there is a unsupported action type + DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:NOT_FOUND@1", "TEST_SLEEP_ACTION"); + } + + @Test(expected = Exception.class) + public void testSleepDebugActionNegative() throws Exception { + DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:SLEEP10", "TEST_SLEEP_ACTION"); + DebugUtils.executeDebugAction("TEST_SLEEP_ACTION|SLEEP10", "TEST_SLEEP_ACTION"); + DebugUtils.executeDebugAction("TEST_SLEEP_ACTION@SLEEP:10", "TEST_SLEEP_ACTION"); + } + + @Test + public void testJitter() { + DebugUtils.executeDebugAction("TEST_JITTER_ACTION:JITTER@1", "test_jitter_action"); + long startTime = System.currentTimeMillis(); + DebugUtils.executeDebugAction( + "SOME_OTHER_ACTION:SLEEP@100|TEST_JITTER_ACTION:JITTER@10@0.2", + "test_jitter_action"); + long endTime = System.currentTimeMillis(); + Assert.assertTrue(endTime - startTime < 100); + } + + @Test(expected = Exception.class) + public void testJitterNegative() throws Exception { + DebugUtils.executeDebugAction("TEST_JITTER_ACTION@JITTER:1", "test_jitter_action"); + DebugUtils.executeDebugAction("TEST_JITTER_ACTION:JITTER", "test_jitter_action"); + } +} diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index ac11ed3..5d1a1fa 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -842,6 +842,17 @@ class ImpalaTestSuite(BaseTestSuite): def execute_query(self, query, query_options=None): return self.__execute_query(self.client, query, query_options) + def exec_and_time(self, query, query_options=None, impalad=0): + """Executes a given query on the given impalad and returns the time taken in + millisecondsas seen by the client.""" + client = self.create_client_for_nth_impalad(impalad) + if query_options is not None: + client.set_configuration(query_options) + start_time = int(round(time.time() * 1000)) + client.execute(query) + end_time = int(round(time.time() * 1000)) + return end_time - start_time + def execute_query_using_client(self, client, query, vector): self.change_database(client, vector.get_value('table_format')) query_options = vector.get_value('exec_option') diff --git a/tests/metadata/test_catalogd_debug_actions.py b/tests/metadata/test_catalogd_debug_actions.py new file mode 100644 index 0000000..0cf3c8b --- /dev/null +++ b/tests/metadata/test_catalogd_debug_actions.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +from tests.common.impala_test_suite import ImpalaTestSuite + + +class TestDebugActions(ImpalaTestSuite): + + @pytest.mark.execute_serially + def test_catalogd_debug_actions(self, unique_database): + self.client.execute("refresh tpcds.store_sales") + self.client.execute( + "create table {0}.test like functional.alltypes".format(unique_database)) + self.client.execute("insert into {0}.test partition (year,month) " + "select * from functional.alltypes".format(unique_database)) + self.client.execute("compute stats {0}.test".format(unique_database)) + self.__run_debug_action("refresh tpcds.store_sales", + debug_action="catalogd_refresh_hdfs_listing_delay:SLEEP@50", delta=2000) + self.__run_debug_action("refresh tpcds.store_sales", + debug_action="catalogd_refresh_hdfs_listing_delay:JITTER@50@0.5", delta=2000) + self.__run_debug_action( + "alter table {0}.test recover partitions".format(unique_database), + debug_action="catalogd_table_recover_delay:SLEEP@3000", delta=2000) + # the variance of compute stats statement could itself be within few hundred + # millisecs hence adding additional delay of 4000 doesn't necessarily slow down the + # query by 4000 ms always. + self.__run_debug_action("compute stats {0}.test".format(unique_database), + debug_action="catalogd_update_stats_delay:SLEEP@4000", delta=3000) + + def __run_debug_action(self, query, debug_action, delta): + """Test makes sure that the given debug_action is set is indeed causing the query + to run slower.""" + time_taken_before = self.exec_and_time(query) + time_taken_after = self.exec_and_time(query, {"debug_action": debug_action}) + assert (time_taken_after - time_taken_before) > delta