This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch add_log_event in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 794f597a674500cc73d19b3eb9806484084c781a Author: JackieTien97 <[email protected]> AuthorDate: Wed Sep 14 15:43:56 2022 +0800 Change Thread Name Format --- .../mpp/execution/exchange/LocalSourceHandle.java | 5 +- .../execution/exchange/MPPDataExchangeManager.java | 22 ++-- .../db/mpp/execution/exchange/SinkHandle.java | 10 +- .../db/mpp/execution/exchange/SourceHandle.java | 120 ++++++++++----------- .../fragment/FragmentInstanceExecution.java | 2 +- .../fragment/FragmentInstanceManager.java | 4 +- .../fragment/FragmentInstanceStateMachine.java | 2 +- .../execution/schedule/AbstractDriverThread.java | 4 +- .../db/mpp/execution/schedule/DriverScheduler.java | 2 +- .../mpp/execution/schedule/DriverTaskThread.java | 2 +- .../org/apache/iotdb/db/mpp/plan/Coordinator.java | 4 +- .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 12 +-- .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 5 - .../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 3 +- .../db/mpp/plan/execution/QueryExecution.java | 16 +-- .../db/mpp/plan/planner/LocalExecutionPlanner.java | 2 +- .../scheduler/FixedRateFragInsStateTracker.java | 4 +- .../scheduler/FragmentInstanceDispatcherImpl.java | 6 +- .../mpprest/impl/GrafanaApiServiceImpl.java | 2 +- .../protocol/mpprest/impl/RestApiServiceImpl.java | 3 +- .../service/thrift/impl/ClientRPCServiceImpl.java | 4 +- .../impl/DataNodeInternalRPCServiceImpl.java | 2 +- .../org/apache/iotdb/db/utils/SetThreadName.java | 44 ++++++++ 23 files changed, 159 insertions(+), 121 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java index c8182acf8d..6c17791efd 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java @@ -20,11 +20,11 @@ package org.apache.iotdb.db.mpp.execution.exchange; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.concurrent.SetThreadName; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,8 +61,7 @@ public class LocalSourceHandle implements ISourceHandle { this.queue = Validate.notNull(queue); this.queue.setSourceHandle(this); this.sourceHandleListener = Validate.notNull(sourceHandleListener); - this.threadName = - createFullIdFrom(localFragmentInstanceId, localPlanNodeId + "." + "SourceHandle"); + this.threadName = createFullIdFrom(localFragmentInstanceId, localPlanNodeId); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java index c9634a75a9..628fe7ebb6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService; import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent; import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent; @@ -33,7 +34,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockResponse; import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent; import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde; -import io.airlift.concurrent.SetThreadName; import org.apache.commons.lang3.Validate; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -78,7 +78,11 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { @Override public TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req) throws TException { try (SetThreadName fragmentInstanceName = - new SetThreadName(createFullIdFrom(req.sourceFragmentInstanceId, "SinkHandle"))) { + new SetThreadName( + createFullId( + req.sourceFragmentInstanceId.queryId, + req.sourceFragmentInstanceId.fragmentId, + req.sourceFragmentInstanceId.instanceId))) { logger.debug( "[ProcessGetTsBlockRequest] sequence ID in [{}, {})", req.getStartSequenceId(), @@ -104,9 +108,13 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { } @Override - public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e) throws TException { + public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e) { try (SetThreadName fragmentInstanceName = - new SetThreadName(createFullIdFrom(e.sourceFragmentInstanceId, "SinkHandle"))) { + new SetThreadName( + createFullId( + e.sourceFragmentInstanceId.queryId, + e.sourceFragmentInstanceId.fragmentId, + e.sourceFragmentInstanceId.instanceId))) { logger.debug( "Acknowledge data block event received, for data blocks whose sequence ID in [{}, {}) from {}.", e.getStartSequenceId(), @@ -130,8 +138,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { @Override public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException { try (SetThreadName fragmentInstanceName = - new SetThreadName( - createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId + ".SourceHandle"))) { + new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) { logger.debug( "New data block event received, for plan node {} of {} from {}.", e.getTargetPlanNodeId(), @@ -169,8 +176,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager { @Override public void onEndOfDataBlockEvent(TEndOfDataBlockEvent e) throws TException { try (SetThreadName fragmentInstanceName = - new SetThreadName( - createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId + ".SourceHandle"))) { + new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) { logger.debug( "End of data block event received, for plan node {} of {} from {}.", e.getTargetPlanNodeId(), diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java index 63fb42784c..387dc64679 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent; @@ -34,7 +35,6 @@ import org.apache.iotdb.tsfile.utils.Pair; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.concurrent.SetThreadName; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +48,7 @@ import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; -import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom; +import static org.apache.iotdb.db.mpp.common.FragmentInstanceId.createFullId; import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; public class SinkHandle implements ISinkHandle { @@ -113,7 +113,11 @@ public class SinkHandle implements ISinkHandle { this.sinkHandleListener = Validate.notNull(sinkHandleListener); this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager; this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS; - this.threadName = createFullIdFrom(localFragmentInstanceId, "SinkHandle"); + this.threadName = + createFullId( + localFragmentInstanceId.queryId, + localFragmentInstanceId.fragmentId, + localFragmentInstanceId.instanceId); this.blocked = localMemoryManager .getQueryPool() diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java index 9b8d6c548f..0b4f2c4a11 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest; @@ -35,7 +36,6 @@ import org.apache.iotdb.tsfile.utils.Pair; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import io.airlift.concurrent.SetThreadName; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,8 +111,7 @@ public class SourceHandle implements ISourceHandle { this.bufferRetainedSizeInBytes = 0L; this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager; this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS; - this.threadName = - createFullIdFrom(localFragmentInstanceId, localPlanNodeId + "." + "SourceHandle"); + this.threadName = createFullIdFrom(localFragmentInstanceId, localPlanNodeId); } @Override @@ -148,70 +147,67 @@ public class SourceHandle implements ISourceHandle { } private synchronized void trySubmitGetDataBlocksTask() { - try (SetThreadName sourceHandleName = new SetThreadName(threadName)) { - if (aborted || closed) { - return; - } - if (blockedOnMemory != null && !blockedOnMemory.isDone()) { - return; - } - - final int startSequenceId = nextSequenceId; - int endSequenceId = nextSequenceId; - long reservedBytes = 0L; - Pair<ListenableFuture<Void>, Boolean> pair = null; - long blockedSize = 0L; - while (sequenceIdToDataBlockSize.containsKey(endSequenceId)) { - Long bytesToReserve = sequenceIdToDataBlockSize.get(endSequenceId); - if (bytesToReserve == null) { - throw new IllegalStateException("Data block size is null."); - } - pair = - localMemoryManager - .getQueryPool() - .reserve(localFragmentInstanceId.getQueryId(), bytesToReserve); - bufferRetainedSizeInBytes += bytesToReserve; - endSequenceId += 1; - reservedBytes += bytesToReserve; - if (!pair.right) { - blockedSize = bytesToReserve; - break; - } - } + if (aborted || closed) { + return; + } + if (blockedOnMemory != null && !blockedOnMemory.isDone()) { + return; + } - if (pair == null) { - // Next data block not generated yet. Do nothing. - return; + final int startSequenceId = nextSequenceId; + int endSequenceId = nextSequenceId; + long reservedBytes = 0L; + Pair<ListenableFuture<Void>, Boolean> pair = null; + long blockedSize = 0L; + while (sequenceIdToDataBlockSize.containsKey(endSequenceId)) { + Long bytesToReserve = sequenceIdToDataBlockSize.get(endSequenceId); + if (bytesToReserve == null) { + throw new IllegalStateException("Data block size is null."); } - nextSequenceId = endSequenceId; - + pair = + localMemoryManager + .getQueryPool() + .reserve(localFragmentInstanceId.getQueryId(), bytesToReserve); + bufferRetainedSizeInBytes += bytesToReserve; + endSequenceId += 1; + reservedBytes += bytesToReserve; if (!pair.right) { - endSequenceId--; - reservedBytes -= blockedSize; - // The future being not completed indicates, - // 1. Memory has been reserved for blocks in [startSequenceId, endSequenceId). - // 2. Memory reservation for block whose sequence ID equals endSequenceId - 1 is blocked. - // 3. Have not reserve memory for the rest of blocks. - // - // startSequenceId endSequenceId - 1 endSequenceId - // |-------- reserved --------|--- blocked ---|--- not reserved ---| - - // Schedule another call of trySubmitGetDataBlocksTask for the rest of blocks. - blockedOnMemory = pair.left; - final int blockedSequenceId = endSequenceId; - final long blockedRetainedSize = blockedSize; - blockedOnMemory.addListener( - () -> - executorService.submit( - new GetDataBlocksTask( - blockedSequenceId, blockedSequenceId + 1, blockedRetainedSize)), - executorService); + blockedSize = bytesToReserve; + break; } + } - if (endSequenceId > startSequenceId) { - executorService.submit( - new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes)); - } + if (pair == null) { + // Next data block not generated yet. Do nothing. + return; + } + nextSequenceId = endSequenceId; + + if (!pair.right) { + endSequenceId--; + reservedBytes -= blockedSize; + // The future being not completed indicates, + // 1. Memory has been reserved for blocks in [startSequenceId, endSequenceId). + // 2. Memory reservation for block whose sequence ID equals endSequenceId - 1 is blocked. + // 3. Have not reserve memory for the rest of blocks. + // + // startSequenceId endSequenceId - 1 endSequenceId + // |-------- reserved --------|--- blocked ---|--- not reserved ---| + + // Schedule another call of trySubmitGetDataBlocksTask for the rest of blocks. + blockedOnMemory = pair.left; + final int blockedSequenceId = endSequenceId; + final long blockedRetainedSize = blockedSize; + blockedOnMemory.addListener( + () -> + executorService.submit( + new GetDataBlocksTask( + blockedSequenceId, blockedSequenceId + 1, blockedRetainedSize)), + executorService); + } + + if (endSequenceId > startSequenceId) { + executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes)); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java index df99f50d58..678f5cadce 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java @@ -22,9 +22,9 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.driver.IDriver; import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle; import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler; +import org.apache.iotdb.db.utils.SetThreadName; import com.google.common.collect.ImmutableList; -import io.airlift.concurrent.SetThreadName; import io.airlift.stats.CounterStat; import static java.util.Objects.requireNonNull; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java index f27aedb217..16bf91ba36 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java @@ -30,8 +30,8 @@ import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler; import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler; import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.utils.SetThreadName; -import io.airlift.concurrent.SetThreadName; import io.airlift.stats.CounterStat; import io.airlift.units.Duration; import org.slf4j.Logger; @@ -186,7 +186,7 @@ public class FragmentInstanceManager { /** Cancels a FragmentInstance. */ public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) { - logger.debug("cancelTask"); + logger.debug("[CancelFI]"); requireNonNull(instanceId, "taskId is null"); FragmentInstanceContext context = instanceContext.remove(instanceId); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java index eef52841e1..d19e2c7f35 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java @@ -21,11 +21,11 @@ package org.apache.iotdb.db.mpp.execution.fragment; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.StateMachine; import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener; +import org.apache.iotdb.db.utils.SetThreadName; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java index c5fc410f4c..148b688e0d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java @@ -20,8 +20,8 @@ package org.apache.iotdb.db.mpp.execution.schedule; import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask; +import org.apache.iotdb.db.utils.SetThreadName; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +63,7 @@ public abstract class AbstractDriverThread extends Thread implements Closeable { new SetThreadName(next.getFragmentInstance().getInfo().getFullId())) { execute(next); } catch (Throwable t) { - logger.error("execute failed", t); + logger.error("[ExecuteFailed]", t); if (next != null) { next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED); scheduler.toAborted(next); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java index f3d33b3df2..93659d0661 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java @@ -34,9 +34,9 @@ import org.apache.iotdb.db.mpp.execution.schedule.queue.L2PriorityQueue; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskID; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTaskStatus; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java index e56560d0ac..52b890797e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverTaskThread.java @@ -23,10 +23,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.execution.driver.IDriver; import org.apache.iotdb.db.mpp.execution.schedule.queue.IndexedBlockingQueue; import org.apache.iotdb.db.mpp.execution.schedule.task.DriverTask; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.db.utils.stats.CpuTimer; import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.concurrent.SetThreadName; import io.airlift.units.Duration; import java.util.concurrent.Executor; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java index d4f8ac8e9a..4dfd933b40 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java @@ -38,8 +38,8 @@ import org.apache.iotdb.db.mpp.plan.execution.QueryExecution; import org.apache.iotdb.db.mpp.plan.execution.config.ConfigExecution; import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement; import org.apache.iotdb.db.mpp.plan.statement.Statement; +import org.apache.iotdb.db.utils.SetThreadName; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,7 +120,7 @@ public class Coordinator { QueryId globalQueryId = queryIdGenerator.createNextQueryId(); try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) { if (sql != null && sql.length() > 0) { - LOGGER.info("start executing sql: {}", sql); + LOGGER.info("[QueryStart] sql: {}", sql); } MPPQueryContext queryContext = new MPPQueryContext( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java index a5dace95a8..47ab3a348d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java @@ -137,10 +137,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> this.schemaFetcher = schemaFetcher; } - private String getLogHeader() { - return String.format("Query[%s]:", context.getQueryId()); - } - @Override public Analysis visitNode(StatementNode node, MPPQueryContext context) { throw new UnsupportedOperationException( @@ -169,9 +165,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> analysis.setStatement(queryStatement); // request schema fetch API - logger.info("{} fetch query schema...", getLogHeader()); + logger.info("[StartFetchSchema]"); ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree); - logger.info("{} fetch schema done", getLogHeader()); + logger.info("[EndFetchSchema]"); // If there is no leaf node in the schema tree, the query should be completed immediately if (schemaTree.isEmpty()) { if (queryStatement.isLastQuery()) { @@ -1355,9 +1351,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> if (showTimeSeriesStatement.isOrderByHeat()) { patternTree.constructTree(); // request schema fetch API - logger.info("{} fetch query schema...", getLogHeader()); + logger.info("[StartFetchSchema]"); ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree); - logger.info("{} fetch schema done", getLogHeader()); + logger.info("[EndFetchSchema]]"); List<MeasurementPath> allSelectedPath = schemaTree.getAllMeasurement(); Set<Expression> sourceExpressions = diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java index 567030ff51..9697bf29e2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java @@ -22,13 +22,8 @@ package org.apache.iotdb.db.mpp.plan.analyze; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.plan.statement.Statement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** Analyze the statement and generate Analysis. */ public class Analyzer { - private static final Logger logger = LoggerFactory.getLogger(Analyzer.class); - private final MPPQueryContext context; private final IPartitionFetcher partitionFetcher; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java index 77ff8e9abb..c3dbb79aea 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesS import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement; import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement; import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -54,8 +55,6 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import io.airlift.concurrent.SetThreadName; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index e9d6857ae1..c15bffa607 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -56,13 +56,13 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +153,7 @@ public class QueryExecution implements IQueryExecution { if (state == QueryState.FAILED || state == QueryState.ABORTED || state == QueryState.CANCELED) { - logger.info("release resource because Query State is: {}", state); + logger.info("[ReleaseQueryResource] state is: {}", state); releaseResource(); } } @@ -162,7 +162,7 @@ public class QueryExecution implements IQueryExecution { public void start() { if (skipExecute()) { - logger.info("execution of query will be skipped. Transit to RUNNING immediately."); + logger.info("[SkipExecute]"); constructResultForMemorySource(); stateMachine.transitionToRunning(); return; @@ -184,14 +184,14 @@ public class QueryExecution implements IQueryExecution { private ExecutionResult retry() { if (retryCount >= MAX_RETRY_COUNT) { - logger.error("reach max retry count. transit query to failed"); + logger.warn("[ReachMaxRetryCount]"); stateMachine.transitionToFailed(); return getStatus(); } logger.warn("error when executing query. {}", stateMachine.getFailureMessage()); // stop and clean up resources the QueryExecution used this.stopAndCleanup(); - logger.info("wait {}ms before retry...", RETRY_INTERVAL_IN_MS); + logger.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS); try { Thread.sleep(RETRY_INTERVAL_IN_MS); } catch (InterruptedException e) { @@ -199,7 +199,7 @@ public class QueryExecution implements IQueryExecution { Thread.currentThread().interrupt(); } retryCount++; - logger.info("start to retry. Retry count is: {}", retryCount); + logger.info("[Retry] retry count is: {}", retryCount); stateMachine.transitionToQueued(); // force invalid PartitionCache partitionFetcher.invalidAllCache(); @@ -326,7 +326,7 @@ public class QueryExecution implements IQueryExecution { while (true) { try { if (resultHandle.isAborted()) { - logger.info("resultHandle for client is aborted"); + logger.warn("[ResultHandleAborted]"); stateMachine.transitionToAborted(); if (stateMachine.getFailureStatus() != null) { throw new IoTDBException( @@ -339,7 +339,7 @@ public class QueryExecution implements IQueryExecution { // Once the resultHandle is finished, we should transit the state of this query to // FINISHED. // So that the corresponding cleanup work could be triggered. - logger.info("resultHandle for client is finished"); + logger.warn("[ResultHandleFinished]"); stateMachine.transitionToFinished(); return Optional.empty(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index 6f896a6e9e..cc0d7e7dfd 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -32,10 +32,10 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.timer.ITimeSliceAllocator; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.read.filter.basic.Filter; -import io.airlift.concurrent.SetThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java index 08563d9361..5c50b80e9b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java @@ -27,8 +27,8 @@ import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.QueryStateMachine; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.utils.SetThreadName; -import io.airlift.concurrent.SetThreadName; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +100,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker { instanceStateMap.computeIfAbsent( instance.getId(), k -> new InstanceStateMetrics(instance.isRoot())); if (needPrintState(metrics.lastState, state, metrics.durationToLastPrintInMS)) { - logger.info("State is {}", state); + logger.info("[PrintFIState] state is {}", state); metrics.reset(state); } else { metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java index e2b5a88591..e1d01c9753 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -39,6 +39,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance; import org.apache.iotdb.mpp.rpc.thrift.TPlanNode; import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq; @@ -48,7 +49,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; -import io.airlift.concurrent.SetThreadName; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,7 +109,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } catch (FragmentInstanceDispatchException e) { return new FragInstanceDispatchResult(e.getFailureStatus()); } catch (Throwable t) { - logger.error("cannot dispatch FI for read operation", t); + logger.error("[DispatchFailed]", t); return new FragInstanceDispatchResult( RpcUtils.getStatus( TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())); @@ -126,7 +126,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } catch (FragmentInstanceDispatchException e) { return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus())); } catch (Throwable t) { - logger.error("cannot dispatch FI for write operation", t); + logger.error("[DispatchFailed]", t); return immediateFuture( new FragInstanceDispatchResult( RpcUtils.getStatus( diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java index 9978d558d7..8822e72ac3 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/GrafanaApiServiceImpl.java @@ -43,10 +43,10 @@ import org.apache.iotdb.db.protocol.mpprest.model.ExecutionStatus; import org.apache.iotdb.db.protocol.mpprest.model.ExpressionRequest; import org.apache.iotdb.db.protocol.mpprest.model.SQL; import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.rpc.TSStatusCode; import com.google.common.base.Joiner; -import io.airlift.concurrent.SetThreadName; import org.apache.commons.lang3.StringUtils; import javax.ws.rs.core.Response; diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java index 34dbe2ffcd..f53624e332 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mpprest/impl/RestApiServiceImpl.java @@ -42,10 +42,9 @@ import org.apache.iotdb.db.protocol.mpprest.model.ExecutionStatus; import org.apache.iotdb.db.protocol.mpprest.model.InsertTabletRequest; import org.apache.iotdb.db.protocol.mpprest.model.SQL; import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.rpc.TSStatusCode; -import io.airlift.concurrent.SetThreadName; - import javax.ws.rs.core.Response; import javax.ws.rs.core.SecurityContext; diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index 2619c8a797..defe25b4c1 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -62,6 +62,7 @@ import org.apache.iotdb.db.service.metrics.MetricService; import org.apache.iotdb.db.service.metrics.enums.Operation; import org.apache.iotdb.db.sync.SyncService; import org.apache.iotdb.db.utils.QueryDataSetUtils; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.metrics.config.MetricConfigDescriptor; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.rpc.ConfigNodeConnectionException; @@ -110,7 +111,6 @@ import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.column.Column; -import io.airlift.concurrent.SetThreadName; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1595,7 +1595,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); if (queryExecution != null) { try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) { - LOGGER.info("stop and clean up"); + LOGGER.info("[CleanUpQuery]]"); queryExecution.stopAndCleanup(); COORDINATOR.removeQueryExecution(queryId); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java index 81362ca88f..1dc70958b7 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -79,6 +79,7 @@ import org.apache.iotdb.db.service.metrics.MetricService; import org.apache.iotdb.db.service.metrics.enums.Metric; import org.apache.iotdb.db.service.metrics.enums.Tag; import org.apache.iotdb.db.trigger.service.TriggerManagementService; +import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.metrics.config.MetricConfigDescriptor; import org.apache.iotdb.metrics.type.Gauge; import org.apache.iotdb.metrics.utils.MetricLevel; @@ -118,7 +119,6 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.exception.NotImplementedException; import com.google.common.collect.ImmutableList; -import io.airlift.concurrent.SetThreadName; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SetThreadName.java b/server/src/main/java/org/apache/iotdb/db/utils/SetThreadName.java new file mode 100644 index 0000000000..3dca499262 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/utils/SetThreadName.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.utils; + +import java.io.Closeable; + +import static java.util.Objects.requireNonNull; + +public class SetThreadName implements Closeable { + private final String originalThreadName; + + public SetThreadName(String suffix) { + requireNonNull(suffix, "suffix is null"); + originalThreadName = Thread.currentThread().getName(); + int index = originalThreadName.indexOf("$"); + if (index < 0) { + Thread.currentThread().setName(String.format("%s$%s", originalThreadName, suffix)); + } else { + Thread.currentThread() + .setName(String.format("%s$%s", originalThreadName.substring(0, index), suffix)); + } + } + + @Override + public void close() { + Thread.currentThread().setName(originalThreadName); + } +}
