This is an automated email from the ASF dual-hosted git repository.
richardstartin pushed a commit to branch tracing-spi
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/tracing-spi by this push:
new d53bff0574 make active recording available implicitly
d53bff0574 is described below
commit d53bff05740d4c85fa6042d533275dd11d011d1d
Author: richardstartin <[email protected]>
AuthorDate: Tue Apr 5 16:43:48 2022 +0100
make active recording available implicitly
---
...tsTest.java => OperatorExecutionStatsTest.java} | 2 +-
.../apache/pinot/core/operator/BaseOperator.java | 7 +----
.../query/scheduler/resources/ResourceManager.java | 9 ++----
.../pinot/core/util/trace/DefaultTracer.java | 36 +++++++++++++++++++---
.../apache/pinot/core/util/trace/TracedThread.java | 31 +++++++++++++++++--
.../pinot/core/util/trace/TracedThreadFactory.java | 27 ++++++++++++++--
.../pinot/spi/trace/{Scope.java => Execution.java} | 2 +-
.../apache/pinot/spi/trace/OperatorExecution.java | 2 +-
.../spi/trace/{Scope.java => TraceContext.java} | 12 ++++++--
.../java/org/apache/pinot/spi/trace/Tracer.java | 5 +++
10 files changed, 105 insertions(+), 28 deletions(-)
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/OperatorExecutionStatsTest.java
similarity index 99%
rename from
pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
rename to
pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/OperatorExecutionStatsTest.java
index 476bea869e..41a4d3f1f3 100644
---
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ExecutionStatsTest.java
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/OperatorExecutionStatsTest.java
@@ -29,7 +29,7 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
-public class ExecutionStatsTest {
+public class OperatorExecutionStatsTest {
private JsonNode _mockBrokerResponse;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
index 369f936e95..bfebb0a95d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.operator;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.spi.exception.EarlyTerminationException;
-import org.apache.pinot.spi.trace.ExecutionRecording;
import org.apache.pinot.spi.trace.OperatorExecution;
import org.apache.pinot.spi.trace.Tracing;
@@ -37,14 +36,10 @@ public abstract class BaseOperator<T extends Block>
implements Operator<T> {
throw new EarlyTerminationException();
}
try (OperatorExecution execution =
Tracing.getTracer().startOperatorExecution(getClass())) {
- return getNextBlock(execution);
+ return getNextBlock();
}
}
// Make it protected because we should always call nextBlock()
protected abstract T getNextBlock();
-
- protected T getNextBlock(ExecutionRecording recording) {
- return getNextBlock();
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
index d1de9e8c88..c73db98537 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
@@ -21,12 +21,12 @@ package org.apache.pinot.core.query.scheduler.resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
+import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,15 +80,12 @@ public abstract class ResourceManager {
LOGGER.info("Initializing with {} query runner threads and {} worker
threads", _numQueryRunnerThreads,
_numQueryWorkerThreads);
// pqr -> pinot query runner (to give short names)
- ThreadFactory queryRunnerFactory =
- new
ThreadFactoryBuilder().setDaemon(false).setPriority(QUERY_RUNNER_THREAD_PRIORITY).setNameFormat("pqr-%d")
- .build();
+ ThreadFactory queryRunnerFactory = new
TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false, "pqr-%d");
_queryRunners =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryRunnerThreads,
queryRunnerFactory));
// pqw -> pinot query workers
- ThreadFactory queryWorkersFactory =
- new
ThreadFactoryBuilder().setDaemon(false).setPriority(Thread.NORM_PRIORITY).setNameFormat("pqw-%d").build();
+ ThreadFactory queryWorkersFactory = new
TracedThreadFactory(Thread.NORM_PRIORITY, false, "pqw-%d");
_queryWorkers =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(_numQueryWorkerThreads,
queryWorkersFactory));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java
index 91522f2591..ed53b49580 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/DefaultTracer.java
@@ -18,10 +18,14 @@
*/
package org.apache.pinot.core.util.trace;
+import java.util.ArrayDeque;
+import java.util.Deque;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.trace.ExecutionRecording;
import org.apache.pinot.spi.trace.FilterType;
import org.apache.pinot.spi.trace.OperatorExecution;
import org.apache.pinot.spi.trace.Phase;
+import org.apache.pinot.spi.trace.TraceContext;
import org.apache.pinot.spi.trace.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +34,7 @@ import org.slf4j.LoggerFactory;
public class DefaultTracer implements Tracer {
private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultTracer.class);
+ private static final ThreadLocal<Deque<ExecutionRecording>> STACK =
ThreadLocal.withInitial(ArrayDeque::new);
private static class NoOpExecution implements OperatorExecution {
@@ -72,9 +77,11 @@ public class DefaultTracer implements Tracer {
private final long _startTimeMillis = System.currentTimeMillis();
private final Class<?> _operator;
+ private final Runnable _onClose;
- public MillisExecution(Class<?> operator) {
+ public MillisExecution(Class<?> operator, Runnable onClose) {
_operator = operator;
+ _onClose = onClose;
}
@Override
@@ -84,17 +91,38 @@ public class DefaultTracer implements Tracer {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Time spent in {}: {}", operatorName, duration);
}
- TraceContext.logTime(operatorName, duration);
+ org.apache.pinot.core.util.trace.TraceContext.logTime(operatorName,
duration);
+ _onClose.run();
}
}
@Override
public void register(long requestId) {
- TraceContext.register(requestId);
+ org.apache.pinot.core.util.trace.TraceContext.register(requestId);
}
@Override
public OperatorExecution startOperatorExecution(Class<?> operatorClass) {
- return TraceContext.traceEnabled() ? new MillisExecution(operatorClass) :
NO_OP_SPAN;
+ if (org.apache.pinot.core.util.trace.TraceContext.traceEnabled()) {
+ Deque<ExecutionRecording> stack = getStack();
+ MillisExecution execution = new MillisExecution(operatorClass,
stack::removeLast);
+ stack.addLast(execution);
+ return execution;
+ }
+ return NO_OP_SPAN;
+ }
+
+ @Override
+ public ExecutionRecording activeRecording() {
+ Deque<ExecutionRecording> stack = getStack();
+ return stack.isEmpty() ? NO_OP_SPAN : stack.peekLast();
+ }
+
+ private Deque<ExecutionRecording> getStack() {
+ Thread thread = Thread.currentThread();
+ if (thread instanceof TraceContext) {
+ return ((TraceContext) thread).getRecordings();
+ }
+ return STACK.get();
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThread.java
similarity index 55%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThread.java
index 4ff56d0b6c..f3f78a5382 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThread.java
@@ -16,10 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.trace;
+package org.apache.pinot.core.util.trace;
-public interface Scope extends AutoCloseable {
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.pinot.spi.trace.ExecutionRecording;
+import org.apache.pinot.spi.trace.TraceContext;
+
+
+final class TracedThread extends Thread implements TraceContext {
+
+ private long _traceId = Long.MIN_VALUE;
+ private final Deque<ExecutionRecording> _stack = new ArrayDeque<>();
+
+ public TracedThread(Runnable target) {
+ super(target);
+ }
+
+ @Override
+ public void setTraceId(long traceId) {
+ _traceId = traceId;
+ }
+
+ @Override
+ public Deque<ExecutionRecording> getRecordings() {
+ return _stack;
+ }
@Override
- void close();
+ public long getTraceId() {
+ return _traceId;
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThreadFactory.java
similarity index 51%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThreadFactory.java
index 4ff56d0b6c..dda7bf7876 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TracedThreadFactory.java
@@ -16,10 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.trace;
+package org.apache.pinot.core.util.trace;
-public interface Scope extends AutoCloseable {
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public final class TracedThreadFactory implements ThreadFactory {
+
+ private final int _priority;
+ private final boolean _daemon;
+ private final String _nameFormat;
+ private final AtomicInteger _count = new AtomicInteger();
+
+ public TracedThreadFactory(int priority, boolean daemon, String nameFormat) {
+ _priority = priority;
+ _daemon = daemon;
+ _nameFormat = nameFormat;
+ }
@Override
- void close();
+ public Thread newThread(Runnable task) {
+ Thread thread = new TracedThread(task);
+ thread.setPriority(_priority);
+ thread.setDaemon(_daemon);
+ thread.setName(String.format(_nameFormat, _count.getAndIncrement()));
+ return thread;
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Execution.java
similarity index 94%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/trace/Execution.java
index 4ff56d0b6c..40835ab32c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Execution.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.spi.trace;
-public interface Scope extends AutoCloseable {
+public interface Execution extends AutoCloseable {
@Override
void close();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java
index ec2eafd135..ff8a330e46 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/OperatorExecution.java
@@ -18,5 +18,5 @@
*/
package org.apache.pinot.spi.trace;
-public interface OperatorExecution extends Scope, ExecutionRecording {
+public interface OperatorExecution extends Execution, ExecutionRecording {
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/TraceContext.java
similarity index 84%
rename from pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/trace/TraceContext.java
index 4ff56d0b6c..0105a01ecc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Scope.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/TraceContext.java
@@ -18,8 +18,14 @@
*/
package org.apache.pinot.spi.trace;
-public interface Scope extends AutoCloseable {
+import java.util.Deque;
- @Override
- void close();
+
+public interface TraceContext {
+
+ long getTraceId();
+
+ void setTraceId(long traceId);
+
+ Deque<ExecutionRecording> getRecordings();
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
index 92a77b4985..ee0d89adf2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
@@ -27,4 +27,9 @@ public interface Tracer {
void register(long requestId);
OperatorExecution startOperatorExecution(Class<?> clazz);
+
+ /**
+ * @return the active execution
+ */
+ ExecutionRecording activeRecording();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]