gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r221675798
 
 

 ##########
 File path: 
processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java
 ##########
 @@ -24,50 +24,89 @@
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.filter.Filter;
 import org.joda.time.Interval;
 
+import javax.annotation.concurrent.GuardedBy;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
- * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with 
RuntimeException on access not from the
- * thread where it was constructed. To "transfer" DefaultQueryMetrics from one 
thread to another {@link #ownerThread}
- * field should be updated.
+ * A basic thread-safe implementation of the {@link QueryMetrics} interface.
+ *
+ * <p><b>Note for inheritence:</b> Subclass should override {@link 
#makeCopy()} method to return
+ * an instance of that subclass. See {@link 
org.apache.druid.query.groupby.DefaultGroupByQueryMetrics}
+ * for an example.
  */
 public class DefaultQueryMetrics<QueryType extends Query<?>> implements 
QueryMetrics<QueryType>
 {
-  protected final ObjectMapper jsonMapper;
-  protected final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
-  protected final Map<String, Number> metrics = new HashMap<>();
+  private static final EmittingLogger log = new 
EmittingLogger(DefaultQueryMetrics.class);
 
-  /** Non final to give subclasses ability to reassign it. */
-  protected Thread ownerThread = Thread.currentThread();
+  protected final ObjectMapper jsonMapper;
+  protected final Object lock = new Object();
+  @GuardedBy("lock") private final Map<String, String> singleValueDims = new 
HashMap<>();
+  @GuardedBy("lock") private final Map<String, String[]> multiValueDims = new 
HashMap<>();
+  @GuardedBy("lock") private final Map<String, Number> metrics = new 
HashMap<>();
 
   public DefaultQueryMetrics(ObjectMapper jsonMapper)
   {
     this.jsonMapper = jsonMapper;
   }
 
-  protected void checkModifiedFromOwnerThread()
+  // copy constructor, used by makeCopy()
+  public DefaultQueryMetrics(DefaultQueryMetrics that)
   {
-    if (Thread.currentThread() != ownerThread) {
-      throw new IllegalStateException(
-          "DefaultQueryMetrics must not be modified from multiple threads. If 
it is needed to gather dimension or "
-          + "metric information from multiple threads or from an async thread, 
this information should explicitly be "
-          + "passed between threads (e. g. using Futures), or this 
DefaultQueryMetrics's ownerThread should be "
-          + "reassigned explicitly");
-    }
+    this.jsonMapper = that.jsonMapper;
+    this.singleValueDims.putAll(that.singleValueDims);
+    this.multiValueDims.putAll(that.multiValueDims);
+    this.metrics.putAll(that.metrics);
   }
 
   protected void setDimension(String dimension, String value)
   {
-    checkModifiedFromOwnerThread();
-    builder.setDimension(dimension, value);
+    synchronized (lock) {
 
 Review comment:
   I used ConcurrentHashMap at first but switched to this coarse grained lock 
because 
   1. Lock contention should be very low on QueryMetrics object. Normally it is 
used by single thread. Concurrent runners would create separate QueryMetrics 
for each child runner.
   2. Imagine a thread adds a dimension and a metric, in order for the 
subsequent emit() reflects the newly added dimension and metric, making each 
Map concurrent is not enough. We need synchronization across these fields.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to