[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227297357 ## File path: processing/src/main/java/org/apache/druid/query/QueryMetrics.java ## @@ -92,7 +94,7 @@ * 100% guarantee of compatibility, because methods could not only be added to QueryMetrics, existing methods could also * be changed or removed. * - * QueryMetrics is designed for use from a single thread, implementations shouldn't care about thread-safety. + * All implementations of QueryMetrics should be Thread-safe. Review comment: added more statements on the constrains of using QueryMetrics from multiple threads This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227255825 ## File path: processing/src/main/java/org/apache/druid/query/QueryMetrics.java ## @@ -92,7 +94,7 @@ * 100% guarantee of compatibility, because methods could not only be added to QueryMetrics, existing methods could also * be changed or removed. * - * QueryMetrics is designed for use from a single thread, implementations shouldn't care about thread-safety. + * All implementations of QueryMetrics should be Thread-safe. * * * Adding new methods to QueryMetrics Review comment: If needed, we can do the renaming in a separate PR. I want to keep this PR small and easy to review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227251555 ## File path: processing/src/main/java/org/apache/druid/query/QueryPlus.java ## @@ -100,23 +100,11 @@ private QueryPlus(Query query, QueryMetrics queryMetrics, String identity) } } - /** - * Returns a QueryPlus object without the components which are unsafe for concurrent use from multiple threads, - * therefore couldn't be passed down in concurrent or async {@link QueryRunner}s. - * - * Currently the only unsafe component is {@link QueryMetrics}, i. e. {@code withoutThreadUnsafeState()} call is - * equivalent to {@link #withoutQueryMetrics()}. - */ - public QueryPlus withoutThreadUnsafeState() - { -return withoutQueryMetrics(); - } - /** * Returns the same QueryPlus object, if it doesn't have {@link QueryMetrics} ({@link #getQueryMetrics()} returns Review comment: add motivation to comments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227251555 ## File path: processing/src/main/java/org/apache/druid/query/QueryPlus.java ## @@ -100,23 +100,11 @@ private QueryPlus(Query query, QueryMetrics queryMetrics, String identity) } } - /** - * Returns a QueryPlus object without the components which are unsafe for concurrent use from multiple threads, - * therefore couldn't be passed down in concurrent or async {@link QueryRunner}s. - * - * Currently the only unsafe component is {@link QueryMetrics}, i. e. {@code withoutThreadUnsafeState()} call is - * equivalent to {@link #withoutQueryMetrics()}. - */ - public QueryPlus withoutThreadUnsafeState() - { -return withoutQueryMetrics(); - } - /** * Returns the same QueryPlus object, if it doesn't have {@link QueryMetrics} ({@link #getQueryMetrics()} returns Review comment: add motivation to comments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227246539 ## File path: processing/src/main/java/org/apache/druid/query/QueryPlus.java ## @@ -68,7 +68,7 @@ private QueryPlus(Query query, QueryMetrics queryMetrics, String identity) /** * Returns the same QueryPlus object with the identity replaced. This new identity will affect future calls to - * {@link #withoutQueryMetrics()} but will not affect any currently-existing queryMetrics. + * {@link #withQueryMetrics(QueryToolChest)} ()} but will not affect any currently-existing queryMetrics. Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227246539 ## File path: processing/src/main/java/org/apache/druid/query/QueryPlus.java ## @@ -68,7 +68,7 @@ private QueryPlus(Query query, QueryMetrics queryMetrics, String identity) /** * Returns the same QueryPlus object with the identity replaced. This new identity will affect future calls to - * {@link #withoutQueryMetrics()} but will not affect any currently-existing queryMetrics. + * {@link #withQueryMetrics(QueryToolChest)} ()} but will not affect any currently-existing queryMetrics. Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227245253 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -24,50 +24,89 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** - * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the - * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} - * field should be updated. + * A basic thread-safe implementation of the {@link QueryMetrics} interface. + * + * Note for inheritence: Subclass should override {@link #makeCopy()} method to return + * an instance of that subclass. See {@link org.apache.druid.query.groupby.DefaultGroupByQueryMetrics} + * for an example. */ public class DefaultQueryMetrics> implements QueryMetrics { - protected final ObjectMapper jsonMapper; - protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - protected final Map metrics = new HashMap<>(); + private static final EmittingLogger log = new EmittingLogger(DefaultQueryMetrics.class); - /** Non final to give subclasses ability to reassign it. */ - protected Thread ownerThread = Thread.currentThread(); + protected final ObjectMapper jsonMapper; + protected final Object lock = new Object(); + @GuardedBy("lock") private final Map singleValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map multiValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map metrics = new HashMap<>(); public DefaultQueryMetrics(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } - protected void checkModifiedFromOwnerThread() + // copy constructor, used by makeCopy() + public DefaultQueryMetrics(DefaultQueryMetrics that) { -if (Thread.currentThread() != ownerThread) { - throw new IllegalStateException( - "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or " - + "metric information from multiple threads or from an async thread, this information should explicitly be " - + "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be " - + "reassigned explicitly"); -} +this.jsonMapper = that.jsonMapper; +this.singleValueDims.putAll(that.singleValueDims); +this.multiValueDims.putAll(that.multiValueDims); +this.metrics.putAll(that.metrics); } protected void setDimension(String dimension, String value) { -checkModifiedFromOwnerThread(); -builder.setDimension(dimension, value); +synchronized (lock) { + String oldValue = singleValueDims.put(dimension, value); + if (oldValue != null && !oldValue.equals(value)) { Review comment: I don't think there are cases for setting the same dimensions from multiple threads. But the overhead to prohibit such usages is a bit high for me. You needs to maintain the owner thread for each dimension. And I think the checks in reportMetric and emit are sufficient to catch most bugs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227243219 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -24,50 +24,89 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** - * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the - * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} - * field should be updated. + * A basic thread-safe implementation of the {@link QueryMetrics} interface. + * + * Note for inheritence: Subclass should override {@link #makeCopy()} method to return + * an instance of that subclass. See {@link org.apache.druid.query.groupby.DefaultGroupByQueryMetrics} + * for an example. */ public class DefaultQueryMetrics> implements QueryMetrics { - protected final ObjectMapper jsonMapper; - protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - protected final Map metrics = new HashMap<>(); + private static final EmittingLogger log = new EmittingLogger(DefaultQueryMetrics.class); - /** Non final to give subclasses ability to reassign it. */ - protected Thread ownerThread = Thread.currentThread(); + protected final ObjectMapper jsonMapper; + protected final Object lock = new Object(); + @GuardedBy("lock") private final Map singleValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map multiValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map metrics = new HashMap<>(); public DefaultQueryMetrics(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } - protected void checkModifiedFromOwnerThread() + // copy constructor, used by makeCopy() + public DefaultQueryMetrics(DefaultQueryMetrics that) { -if (Thread.currentThread() != ownerThread) { - throw new IllegalStateException( - "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or " - + "metric information from multiple threads or from an async thread, this information should explicitly be " - + "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be " - + "reassigned explicitly"); -} +this.jsonMapper = that.jsonMapper; +this.singleValueDims.putAll(that.singleValueDims); +this.multiValueDims.putAll(that.multiValueDims); +this.metrics.putAll(that.metrics); } protected void setDimension(String dimension, String value) { -checkModifiedFromOwnerThread(); -builder.setDimension(dimension, value); +synchronized (lock) { + String oldValue = singleValueDims.put(dimension, value); + if (oldValue != null && !oldValue.equals(value)) { Review comment: It's possible to set dimensions from multiple threads. For example, MetricsEmittingQueryRunner adds custom dimension in one thread [#L91](https://github.com/apache/incubator-druid/blob/3ae563263a23000560749071d262727d47296856/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryRunner.java#L91) and other dimension in possibility another thread [#L112](https://github.com/apache/incubator-druid/blob/3ae563263a23000560749071d262727d47296856/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryRunner.java#L112). It's also possible for one thread to set the same dimensions more than once. For instance, the [query runner](https://github.com/apache/incubator-druid/blob/359576a80bccaea4168e31323e931a806b26159b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java#L295) for historical segment processing contains two MetricsEmittingQueryRunner, which would set "segment" and "status" dimension twice. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For addi
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227223526 ## File path: sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java ## @@ -165,6 +173,13 @@ public SegmentDescriptor apply(final PartitionChunk chunk) ), toolChest ); +return CPUTimeMetricQueryRunner.safeBuild( Review comment: it helps reproduce #4826 in DruidAvaticaHandlerTest#testMaxRowsPerFrame This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227222948 ## File path: sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java ## @@ -467,7 +468,8 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker( .rows(FORBIDDEN_ROWS) .buildMMappedIndex(); -return new SpecificSegmentsQuerySegmentWalker(conglomerate).add( +return new SpecificSegmentsQuerySegmentWalker(conglomerate, new NoopServiceEmitter()) +.add( DataSegment.builder() Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227222702 ## File path: processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java ## @@ -239,11 +237,12 @@ public AggregateResult call() @SuppressWarnings("unused") Releaser grouperReleaser = grouperHolder.increment() ) { -final AggregateResult retVal = input.run(queryPlusForRunners, responseContext) - .accumulate( - AggregateResult.ok(), - accumulator -); +final AggregateResult retVal = input +.run(queryPlusForRunners, responseContext) +.accumulate( +AggregateResult.ok(), Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227222402 ## File path: processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java ## @@ -122,11 +122,9 @@ public GroupByMergingQueryRunnerV2( CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, false ); -final QueryPlus queryPlusForRunners = queryPlus -.withQuery( - query.withOverriddenContext(ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true)) -) -.withoutThreadUnsafeState(); +final QueryPlus queryPlusForRunners = queryPlus.withQuery( + query.withOverriddenContext(ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true)) Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227221300 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -23,51 +23,64 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.bitmap.BitmapFactory; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -/** - * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the - * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} - * field should be updated. - */ public class DefaultQueryMetrics> implements QueryMetrics { protected final ObjectMapper jsonMapper; - protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - protected final Map metrics = new HashMap<>(); + protected final Object lock = new Object(); + @GuardedBy("lock") private final Map singleValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map multiValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map metrics = new HashMap<>(); + @GuardedBy("lock") private Thread ownerThread; - /** Non final to give subclasses ability to reassign it. */ - protected Thread ownerThread = Thread.currentThread(); public DefaultQueryMetrics(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } - protected void checkModifiedFromOwnerThread() + protected void setDimension(String dimension, String value) { -if (Thread.currentThread() != ownerThread) { - throw new IllegalStateException( - "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or " - + "metric information from multiple threads or from an async thread, this information should explicitly be " - + "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be " - + "reassigned explicitly"); +synchronized (lock) { + String oldValue = singleValueDims.put(dimension, value); + if (oldValue != null && !oldValue.equals(value)) { +throw new ISE("Changing dimension values in QueryMetrics is not allowed: %s", dimension); + } } } - protected void setDimension(String dimension, String value) + protected void setDimensions(String dimension, String[] values) { -checkModifiedFromOwnerThread(); -builder.setDimension(dimension, value); +synchronized (lock) { + String[] oldValues = multiValueDims.put(dimension, values); + if (oldValues != null && !Arrays.equals(oldValues, values)) { +throw new ISE("Changing dimension values in QueryMetrics is not allowed: %s", dimension); + } +} + } + + protected QueryMetrics reportMetric(String metricName, Number value) + { +synchronized (lock) { + if (metrics.put(metricName, value) != null) { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227221067 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -23,51 +23,64 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.bitmap.BitmapFactory; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -/** - * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the - * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} - * field should be updated. - */ public class DefaultQueryMetrics> implements QueryMetrics { protected final ObjectMapper jsonMapper; - protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - protected final Map metrics = new HashMap<>(); + protected final Object lock = new Object(); + @GuardedBy("lock") private final Map singleValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map multiValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map metrics = new HashMap<>(); + @GuardedBy("lock") private Thread ownerThread; - /** Non final to give subclasses ability to reassign it. */ - protected Thread ownerThread = Thread.currentThread(); public DefaultQueryMetrics(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } - protected void checkModifiedFromOwnerThread() + protected void setDimension(String dimension, String value) { -if (Thread.currentThread() != ownerThread) { - throw new IllegalStateException( - "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or " - + "metric information from multiple threads or from an async thread, this information should explicitly be " - + "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be " - + "reassigned explicitly"); +synchronized (lock) { + String oldValue = singleValueDims.put(dimension, value); + if (oldValue != null && !oldValue.equals(value)) { +throw new ISE("Changing dimension values in QueryMetrics is not allowed: %s", dimension); + } } } - protected void setDimension(String dimension, String value) + protected void setDimensions(String dimension, String[] values) { -checkModifiedFromOwnerThread(); -builder.setDimension(dimension, value); +synchronized (lock) { + String[] oldValues = multiValueDims.put(dimension, values); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227221018 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -23,51 +23,64 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.bitmap.BitmapFactory; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -/** - * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the - * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} - * field should be updated. - */ public class DefaultQueryMetrics> implements QueryMetrics { protected final ObjectMapper jsonMapper; - protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - protected final Map metrics = new HashMap<>(); + protected final Object lock = new Object(); + @GuardedBy("lock") private final Map singleValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map multiValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map metrics = new HashMap<>(); + @GuardedBy("lock") private Thread ownerThread; - /** Non final to give subclasses ability to reassign it. */ - protected Thread ownerThread = Thread.currentThread(); public DefaultQueryMetrics(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } - protected void checkModifiedFromOwnerThread() + protected void setDimension(String dimension, String value) { -if (Thread.currentThread() != ownerThread) { - throw new IllegalStateException( - "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or " - + "metric information from multiple threads or from an async thread, this information should explicitly be " - + "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be " - + "reassigned explicitly"); +synchronized (lock) { + String oldValue = singleValueDims.put(dimension, value); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227218108 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -23,51 +23,64 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.bitmap.BitmapFactory; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -/** - * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the - * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} - * field should be updated. - */ public class DefaultQueryMetrics> implements QueryMetrics { protected final ObjectMapper jsonMapper; - protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - protected final Map metrics = new HashMap<>(); + protected final Object lock = new Object(); + @GuardedBy("lock") private final Map singleValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map multiValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map metrics = new HashMap<>(); + @GuardedBy("lock") private Thread ownerThread; Review comment: subclasses are supposed to use setDimension/setDimensions/reportMetric methods (with proper synchronization and check) instead of directly manipulating on these fields This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r227217110 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -260,13 +272,6 @@ public void identity(String identity) return reportMetric(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs)); } - protected QueryMetrics reportMetric(String metricName, Number value) Review comment: not removed, just moved it below setDimensions() This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r223288053 ## File path: processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java ## @@ -114,10 +113,10 @@ public Void call() { try { if (bySegment) { -input.run(threadSafeQueryPlus, responseContext) +input.run(queryPlus.withQueryMetricsCopied(), responseContext) Review comment: OK, I'll revert changes relating to QueryPlus class. However, since QueryMetrics is now a thread-safe class, the old name withoutThreadUnsafeState() is out of date. I will use withoutQueryMetrics() instead. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r223255506 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -289,12 +320,26 @@ public void identity(String identity) } @Override - public void emit(ServiceEmitter emitter) + public void emit(final ServiceEmitter emitter) Review comment: After checking the code again, I found that emit() is always called from a single thread, which could be considered the owner thread for the QueryMetrics instance. Thanks for your explanation! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r223245747 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -24,50 +24,89 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** - * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the - * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} - * field should be updated. + * A basic thread-safe implementation of the {@link QueryMetrics} interface. + * + * Note for inheritence: Subclass should override {@link #makeCopy()} method to return + * an instance of that subclass. See {@link org.apache.druid.query.groupby.DefaultGroupByQueryMetrics} + * for an example. */ public class DefaultQueryMetrics> implements QueryMetrics { - protected final ObjectMapper jsonMapper; - protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - protected final Map metrics = new HashMap<>(); + private static final EmittingLogger log = new EmittingLogger(DefaultQueryMetrics.class); - /** Non final to give subclasses ability to reassign it. */ - protected Thread ownerThread = Thread.currentThread(); + protected final ObjectMapper jsonMapper; + protected final Object lock = new Object(); + @GuardedBy("lock") private final Map singleValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map multiValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map metrics = new HashMap<>(); public DefaultQueryMetrics(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } - protected void checkModifiedFromOwnerThread() + // copy constructor, used by makeCopy() + public DefaultQueryMetrics(DefaultQueryMetrics that) { -if (Thread.currentThread() != ownerThread) { - throw new IllegalStateException( - "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or " - + "metric information from multiple threads or from an async thread, this information should explicitly be " - + "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be " - + "reassigned explicitly"); -} +this.jsonMapper = that.jsonMapper; +this.singleValueDims.putAll(that.singleValueDims); +this.multiValueDims.putAll(that.multiValueDims); +this.metrics.putAll(that.metrics); } protected void setDimension(String dimension, String value) { -checkModifiedFromOwnerThread(); -builder.setDimension(dimension, value); +synchronized (lock) { + String oldValue = singleValueDims.put(dimension, value); + if (oldValue != null && !oldValue.equals(value)) { + handleIllegalModification(singleValueDims.getOrDefault(DruidMetrics.ID, ""), dimension); + } +} + } + + protected void setDimensions(String dimension, String[] values) + { +synchronized (lock) { + String[] oldValues = multiValueDims.put(dimension, values); + if (oldValues != null && !Arrays.equals(oldValues, values)) { + handleIllegalModification(singleValueDims.getOrDefault(DruidMetrics.ID, ""), dimension); + } +} + } + + protected QueryMetrics reportMetric(String metricName, Number value) + { +synchronized (lock) { + Number oldValue = metrics.put(metricName, value); + if (oldValue != null && !oldValue.equals(value)) { + handleIllegalModification(singleValueDims.getOrDefault(DruidMetrics.ID, ""), metricName); + } + return this; +} + } + + private void handleIllegalModification(String queryId, String entryName) + { +Exception e = new Exception("stack trace"); +log.makeAlert(e, "\"%s\" in QueryMetrics got modified to another value", entryName) Review comment: OK, will throw IllegalStateException instead This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services -
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r223245669 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -24,50 +24,89 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** - * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the - * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} - * field should be updated. + * A basic thread-safe implementation of the {@link QueryMetrics} interface. + * + * Note for inheritence: Subclass should override {@link #makeCopy()} method to return + * an instance of that subclass. See {@link org.apache.druid.query.groupby.DefaultGroupByQueryMetrics} + * for an example. */ public class DefaultQueryMetrics> implements QueryMetrics { - protected final ObjectMapper jsonMapper; - protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - protected final Map metrics = new HashMap<>(); + private static final EmittingLogger log = new EmittingLogger(DefaultQueryMetrics.class); - /** Non final to give subclasses ability to reassign it. */ - protected Thread ownerThread = Thread.currentThread(); + protected final ObjectMapper jsonMapper; + protected final Object lock = new Object(); + @GuardedBy("lock") private final Map singleValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map multiValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map metrics = new HashMap<>(); public DefaultQueryMetrics(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } - protected void checkModifiedFromOwnerThread() + // copy constructor, used by makeCopy() + public DefaultQueryMetrics(DefaultQueryMetrics that) { -if (Thread.currentThread() != ownerThread) { - throw new IllegalStateException( - "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or " - + "metric information from multiple threads or from an async thread, this information should explicitly be " - + "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be " - + "reassigned explicitly"); -} +this.jsonMapper = that.jsonMapper; +this.singleValueDims.putAll(that.singleValueDims); +this.multiValueDims.putAll(that.multiValueDims); +this.metrics.putAll(that.metrics); } protected void setDimension(String dimension, String value) { -checkModifiedFromOwnerThread(); -builder.setDimension(dimension, value); +synchronized (lock) { + String oldValue = singleValueDims.put(dimension, value); + if (oldValue != null && !oldValue.equals(value)) { Review comment: I agree that allowing equal metrics could shallow mistakes like multiple threads trying to report the same metrics, will prohibit such usage. But I think we should allow equal dimension values because there are valid use cases such as nesting MetricsEmittingQueryRunner. Do you think so? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r221683976 ## File path: processing/src/main/java/org/apache/druid/query/QueryMetrics.java ## @@ -92,7 +92,7 @@ * 100% guarantee of compatibility, because methods could not only be added to QueryMetrics, existing methods could also * be changed or removed. * - * QueryMetrics is designed for use from a single thread, implementations shouldn't care about thread-safety. + * All implementations of QueryMetrics should be thread-safety. Review comment: good catch This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r221683930 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -24,50 +24,89 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** - * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the - * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} - * field should be updated. + * A basic thread-safe implementation of the {@link QueryMetrics} interface. + * + * Note for inheritence: Subclass should override {@link #makeCopy()} method to return + * an instance of that subclass. See {@link org.apache.druid.query.groupby.DefaultGroupByQueryMetrics} + * for an example. */ public class DefaultQueryMetrics> implements QueryMetrics { - protected final ObjectMapper jsonMapper; - protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - protected final Map metrics = new HashMap<>(); + private static final EmittingLogger log = new EmittingLogger(DefaultQueryMetrics.class); - /** Non final to give subclasses ability to reassign it. */ - protected Thread ownerThread = Thread.currentThread(); + protected final ObjectMapper jsonMapper; + protected final Object lock = new Object(); + @GuardedBy("lock") private final Map singleValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map multiValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map metrics = new HashMap<>(); public DefaultQueryMetrics(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } - protected void checkModifiedFromOwnerThread() + // copy constructor, used by makeCopy() + public DefaultQueryMetrics(DefaultQueryMetrics that) { -if (Thread.currentThread() != ownerThread) { - throw new IllegalStateException( - "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or " - + "metric information from multiple threads or from an async thread, this information should explicitly be " - + "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be " - + "reassigned explicitly"); -} +this.jsonMapper = that.jsonMapper; +this.singleValueDims.putAll(that.singleValueDims); +this.multiValueDims.putAll(that.multiValueDims); +this.metrics.putAll(that.metrics); } protected void setDimension(String dimension, String value) { -checkModifiedFromOwnerThread(); -builder.setDimension(dimension, value); +synchronized (lock) { + String oldValue = singleValueDims.put(dimension, value); + if (oldValue != null && !oldValue.equals(value)) { + handleIllegalModification(singleValueDims.getOrDefault(DruidMetrics.ID, ""), dimension); + } +} + } + + protected void setDimensions(String dimension, String[] values) + { +synchronized (lock) { + String[] oldValues = multiValueDims.put(dimension, values); + if (oldValues != null && !Arrays.equals(oldValues, values)) { + handleIllegalModification(singleValueDims.getOrDefault(DruidMetrics.ID, ""), dimension); + } +} + } + + protected QueryMetrics reportMetric(String metricName, Number value) + { +synchronized (lock) { + Number oldValue = metrics.put(metricName, value); + if (oldValue != null && !oldValue.equals(value)) { + handleIllegalModification(singleValueDims.getOrDefault(DruidMetrics.ID, ""), metricName); + } + return this; +} + } + + private void handleIllegalModification(String queryId, String entryName) + { +Exception e = new Exception("stack trace"); +log.makeAlert(e, "\"%s\" in QueryMetrics got modified to another value", entryName) Review comment: I didn't throw exception here because I think bugs in metrics handling shouldn't fail the whole query. It would allow users continue to query with only "broken" metrics, which may be acceptable in some use cases. I admit that this may be controversial and would like to hear other committer's opinions. This is an automated messag
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r221677644 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -289,12 +320,26 @@ public void identity(String identity) } @Override - public void emit(ServiceEmitter emitter) + public void emit(final ServiceEmitter emitter) Review comment: What's the benefits of maintaining QueryMetrics's owner thread? I think one benefit of making QueryMetrics thread-safe is that users can create it in one thread and use it in another thread without explicitly transferring ownership. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r221675798 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -24,50 +24,89 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** - * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the - * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} - * field should be updated. + * A basic thread-safe implementation of the {@link QueryMetrics} interface. + * + * Note for inheritence: Subclass should override {@link #makeCopy()} method to return + * an instance of that subclass. See {@link org.apache.druid.query.groupby.DefaultGroupByQueryMetrics} + * for an example. */ public class DefaultQueryMetrics> implements QueryMetrics { - protected final ObjectMapper jsonMapper; - protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - protected final Map metrics = new HashMap<>(); + private static final EmittingLogger log = new EmittingLogger(DefaultQueryMetrics.class); - /** Non final to give subclasses ability to reassign it. */ - protected Thread ownerThread = Thread.currentThread(); + protected final ObjectMapper jsonMapper; + protected final Object lock = new Object(); + @GuardedBy("lock") private final Map singleValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map multiValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map metrics = new HashMap<>(); public DefaultQueryMetrics(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } - protected void checkModifiedFromOwnerThread() + // copy constructor, used by makeCopy() + public DefaultQueryMetrics(DefaultQueryMetrics that) { -if (Thread.currentThread() != ownerThread) { - throw new IllegalStateException( - "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or " - + "metric information from multiple threads or from an async thread, this information should explicitly be " - + "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be " - + "reassigned explicitly"); -} +this.jsonMapper = that.jsonMapper; +this.singleValueDims.putAll(that.singleValueDims); +this.multiValueDims.putAll(that.multiValueDims); +this.metrics.putAll(that.metrics); } protected void setDimension(String dimension, String value) { -checkModifiedFromOwnerThread(); -builder.setDimension(dimension, value); +synchronized (lock) { Review comment: I used ConcurrentHashMap at first but switched to this coarse grained lock because 1. Lock contention should be very low on QueryMetrics object. Normally it is used by single thread. Concurrent runners would create separate QueryMetrics for each child runner. 2. Imagine a thread adds a dimension and a metric, in order for the subsequent emit() reflects the newly added dimension and metric, making each Map concurrent is not enough. We need synchronization across these fields. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r221665413 ## File path: processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java ## @@ -24,50 +24,89 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.collections.bitmap.BitmapFactory; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.concurrent.GuardedBy; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** - * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the - * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} - * field should be updated. + * A basic thread-safe implementation of the {@link QueryMetrics} interface. + * + * Note for inheritence: Subclass should override {@link #makeCopy()} method to return + * an instance of that subclass. See {@link org.apache.druid.query.groupby.DefaultGroupByQueryMetrics} + * for an example. */ public class DefaultQueryMetrics> implements QueryMetrics { - protected final ObjectMapper jsonMapper; - protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - protected final Map metrics = new HashMap<>(); + private static final EmittingLogger log = new EmittingLogger(DefaultQueryMetrics.class); - /** Non final to give subclasses ability to reassign it. */ - protected Thread ownerThread = Thread.currentThread(); + protected final ObjectMapper jsonMapper; + protected final Object lock = new Object(); + @GuardedBy("lock") private final Map singleValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map multiValueDims = new HashMap<>(); + @GuardedBy("lock") private final Map metrics = new HashMap<>(); public DefaultQueryMetrics(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } - protected void checkModifiedFromOwnerThread() + // copy constructor, used by makeCopy() + public DefaultQueryMetrics(DefaultQueryMetrics that) { -if (Thread.currentThread() != ownerThread) { - throw new IllegalStateException( - "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or " - + "metric information from multiple threads or from an async thread, this information should explicitly be " - + "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be " - + "reassigned explicitly"); -} +this.jsonMapper = that.jsonMapper; +this.singleValueDims.putAll(that.singleValueDims); +this.multiValueDims.putAll(that.multiValueDims); +this.metrics.putAll(that.metrics); } protected void setDimension(String dimension, String value) { -checkModifiedFromOwnerThread(); -builder.setDimension(dimension, value); +synchronized (lock) { + String oldValue = singleValueDims.put(dimension, value); + if (oldValue != null && !oldValue.equals(value)) { Review comment: ServerManager.buildAndDecorateQueryRunner reassigns segment id in each MetricsEmittingQueryRunner. Do you think we should fix that or just tolerate it? I think allowing for equal values make no harm and is convenient in such cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[GitHub] gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics
gaodayue commented on a change in pull request #6402: Thread-safe QueryMetrics URL: https://github.com/apache/incubator-druid/pull/6402#discussion_r221660862 ## File path: processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java ## @@ -114,10 +113,10 @@ public Void call() { try { if (bySegment) { -input.run(threadSafeQueryPlus, responseContext) +input.run(queryPlus.withQueryMetricsCopied(), responseContext) Review comment: Yes, it could happen if the client of QueryMetrics forgot to emit. But I think the previous approach has the same problem. And this kind of bugs could be easily found and fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org