PHOENIX-1286 Remove hadoop2 compat modules There was some reflection and wrapping done in the metrics/tracing tools to support working with Hadoop1/2 (though hadoop1 support was never completed). Removing this extra code now that we don't want to support hadoop1 anymore
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b48ca7b5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b48ca7b5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b48ca7b5 Branch: refs/heads/4.0 Commit: b48ca7b5c3c97fe00c7e89978fb387d7013be320 Parents: b2c5ffa Author: Jesse Yates <jya...@apache.org> Authored: Mon Sep 22 15:00:00 2014 -0700 Committer: Jesse Yates <jya...@apache.org> Committed: Mon Oct 27 13:56:29 2014 -0700 ---------------------------------------------------------------------- phoenix-assembly/pom.xml | 10 +- phoenix-core/pom.xml | 23 +- .../apache/phoenix/trace/BaseTracingTestIT.java | 112 ++++--- .../phoenix/trace/DisableableMetricsWriter.java | 27 +- .../trace/Hadoop1TracingTestEnabler.java | 84 ------ .../apache/phoenix/trace/PhoenixMetricImpl.java | 44 --- .../phoenix/trace/PhoenixMetricRecordImpl.java | 71 ----- .../trace/PhoenixTableMetricsWriterIT.java | 28 +- .../apache/phoenix/trace/PhoenixTagImpl.java | 22 +- .../phoenix/trace/PhoenixTraceReaderIT.java | 61 ++-- .../phoenix/trace/PhoenixTracingEndToEndIT.java | 59 ++-- .../apache/phoenix/trace/TracingTestUtil.java | 14 + .../org/apache/phoenix/hbase/index/Indexer.java | 4 +- .../org/apache/phoenix/metrics/MetricInfo.java | 51 ++++ .../org/apache/phoenix/metrics/Metrics.java | 66 ++++ .../apache/phoenix/trace/MetricsInfoImpl.java | 63 ++++ .../phoenix/trace/PhoenixMetricsSink.java | 298 +++++++++++++++++++ .../trace/PhoenixTableMetricsWriter.java | 278 ----------------- .../apache/phoenix/trace/TraceMetricSource.java | 188 ++++++++++++ .../org/apache/phoenix/trace/TraceReader.java | 12 +- .../org/apache/phoenix/trace/TracingUtils.java | 63 ++++ .../org/apache/phoenix/trace/util/Tracing.java | 5 +- .../metrics2/impl/ExposedMetricCounterLong.java | 36 +++ .../metrics2/impl/ExposedMetricsRecordImpl.java | 42 +++ .../metrics2/lib/ExposedMetricsInfoImpl.java | 34 +++ .../org/apache/phoenix/metrics/LoggingSink.java | 60 ++++ .../phoenix/trace/TraceMetricsSourceTest.java | 96 ++++++ phoenix-hadoop-compat/pom.xml | 89 ------ .../org/apache/phoenix/metrics/MetricInfo.java | 51 ---- .../org/apache/phoenix/metrics/Metrics.java | 80 ----- .../apache/phoenix/metrics/MetricsManager.java | 58 ---- .../apache/phoenix/metrics/MetricsWriter.java | 31 -- .../phoenix/metrics/PhoenixAbstractMetric.java | 30 -- .../phoenix/metrics/PhoenixMetricTag.java | 27 -- .../phoenix/metrics/PhoenixMetricsRecord.java | 35 --- .../phoenix/trace/PhoenixSpanReceiver.java | 26 -- .../phoenix/trace/TestableMetricsWriter.java | 30 -- .../org/apache/phoenix/trace/TracingCompat.java | 89 ------ .../org/apache/phoenix/metrics/LoggingSink.java | 56 ---- .../phoenix/metrics/TracingTestCompat.java | 45 --- phoenix-hadoop2-compat/pom.xml | 77 ----- .../phoenix/metrics/MetricsManagerImpl.java | 71 ----- .../apache/phoenix/trace/MetricsInfoImpl.java | 63 ---- .../phoenix/trace/PhoenixMetricsSink.java | 191 ------------ .../apache/phoenix/trace/TraceMetricSource.java | 197 ------------ .../org.apache.phoenix.metrics.MetricsManager | 1 - ...org.apache.phoenix.trace.PhoenixSpanReceiver | 1 - ...g.apache.phoenix.trace.TestableMetricsWriter | 1 - .../metrics2/impl/ExposedMetricCounterLong.java | 35 --- .../metrics2/impl/ExposedMetricsRecordImpl.java | 43 --- .../metrics2/lib/ExposedMetricsInfoImpl.java | 32 -- .../phoenix/trace/PhoenixMetricsWriterTest.java | 142 --------- .../phoenix/trace/TraceMetricsSourceTest.java | 96 ------ .../org/apache/phoenix/trace/TracingTest.java | 34 --- pom.xml | 27 -- 55 files changed, 1156 insertions(+), 2353 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-assembly/pom.xml b/phoenix-assembly/pom.xml index fe02636..b0cbac4 100644 --- a/phoenix-assembly/pom.xml +++ b/phoenix-assembly/pom.xml @@ -138,14 +138,6 @@ </dependency> <dependency> <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-hadoop-compat</artifactId> - </dependency> - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-hadoop2-compat</artifactId> - </dependency> - <dependency> - <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-flume</artifactId> </dependency> <dependency> @@ -153,4 +145,4 @@ <artifactId>phoenix-pig</artifactId> </dependency> </dependencies> -</project> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index 539c38b..194ed58 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -205,17 +205,6 @@ </build> <dependencies> - <!-- Intra project dependencies --> - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-hadoop-compat</artifactId> - </dependency> - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-hadoop-compat</artifactId> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> <!-- Make sure we have all the antlr dependencies --> <dependency> <groupId>org.antlr</groupId> @@ -409,15 +398,5 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> </dependency> - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-hadoop2-compat</artifactId> - </dependency> - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-hadoop2-compat</artifactId> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> </dependencies> -</project> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java index 0f8a666..f504d12 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java @@ -17,25 +17,18 @@ */ package org.apache.phoenix.trace; -import static org.apache.phoenix.util.PhoenixRuntime.ANNOTATION_ATTRIB_PREFIX; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.Collections; -import java.util.Map; -import java.util.Properties; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.impl.ExposedMetricCounterLong; +import org.apache.hadoop.metrics2.impl.ExposedMetricsRecordImpl; +import org.apache.hadoop.metrics2.lib.ExposedMetricsInfoImpl; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.HBaseManagedTimeTest; import org.apache.phoenix.metrics.MetricInfo; -import org.apache.phoenix.metrics.Metrics; -import org.apache.phoenix.metrics.PhoenixAbstractMetric; -import org.apache.phoenix.metrics.PhoenixMetricTag; -import org.apache.phoenix.metrics.PhoenixMetricsRecord; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.trace.util.Tracing; @@ -45,6 +38,14 @@ import org.apache.phoenix.util.PropertiesUtil; import org.junit.Before; import org.junit.experimental.categories.Category; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.*; + +import static org.apache.phoenix.util.PhoenixRuntime.ANNOTATION_ATTRIB_PREFIX; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; + /** * Base test for tracing tests - helps manage getting tracing/non-tracing * connections, as well as any supporting utils. @@ -53,36 +54,17 @@ import org.junit.experimental.categories.Category; public class BaseTracingTestIT extends BaseHBaseManagedTimeIT { private static final Log LOG = LogFactory.getLog(BaseTracingTestIT.class); - /** - * Hadoop1 doesn't yet support tracing (need metrics library support) so we just skip those - * tests for the moment - * @return <tt>true</tt> if the test should exit because some necessary classes are missing, or - * <tt>false</tt> if the tests can continue normally - */ - static boolean shouldEarlyExitForHadoop1Test() { - try { - // get a receiver for the spans - TracingCompat.newTraceMetricSource(); - // which also needs to a source for the metrics system - Metrics.getManager(); - return false; - } catch (RuntimeException e) { - LOG.error("Shouldn't run test because can't instantiate necessary metrics/tracing classes!"); - } - - return true; - } - @Before public void resetTracingTableIfExists() throws Exception { Connection conn = getConnectionWithoutTracing(); conn.setAutoCommit(true); try { - conn.createStatement().executeUpdate("DELETE FROM " + QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME); + conn.createStatement().executeUpdate( + "DELETE FROM " + QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME); } catch (TableNotFoundException ignore) { } } - + public static Connection getConnectionWithoutTracing() throws SQLException { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); return getConnectionWithoutTracing(props); @@ -93,18 +75,19 @@ public class BaseTracingTestIT extends BaseHBaseManagedTimeIT { conn.setAutoCommit(false); return conn; } - - public static Connection getTracingConnection() throws Exception { - return getTracingConnection(Collections.<String, String>emptyMap(), null); + + public static Connection getTracingConnection() throws Exception { + return getTracingConnection(Collections.<String, String>emptyMap(), null); } - public static Connection getTracingConnection(Map<String, String> customAnnotations, String tenantId) throws Exception { + public static Connection getTracingConnection(Map<String, String> customAnnotations, + String tenantId) throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); for (Map.Entry<String, String> annot : customAnnotations.entrySet()) { - props.put(ANNOTATION_ATTRIB_PREFIX + annot.getKey(), annot.getValue()); + props.put(ANNOTATION_ATTRIB_PREFIX + annot.getKey(), annot.getValue()); } if (tenantId != null) { - props.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + props.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } return getConnectionWithTracingFrequency(props, Tracing.Frequency.ALWAYS); } @@ -115,34 +98,49 @@ public class BaseTracingTestIT extends BaseHBaseManagedTimeIT { return DriverManager.getConnection(getUrl(), props); } - public static PhoenixMetricsRecord createRecord(long traceid, long parentid, long spanid, + public static MetricsRecord createRecord(long traceid, long parentid, long spanid, String desc, long startTime, long endTime, String hostname, String... tags) { - PhoenixMetricRecordImpl record = - new PhoenixMetricRecordImpl(TracingCompat.getTraceMetricName(traceid), desc); - PhoenixAbstractMetric span = new PhoenixMetricImpl(MetricInfo.SPAN.traceName, spanid); - record.addMetric(span); - PhoenixAbstractMetric parent = new PhoenixMetricImpl(MetricInfo.PARENT.traceName, parentid); - record.addMetric(parent); + List<AbstractMetric> metrics = new ArrayList<AbstractMetric>(); + AbstractMetric span = new ExposedMetricCounterLong(asInfo(MetricInfo + .SPAN.traceName), + spanid); + metrics.add(span); - PhoenixAbstractMetric start = new PhoenixMetricImpl(MetricInfo.START.traceName, startTime); - record.addMetric(start); + AbstractMetric parent = new ExposedMetricCounterLong(asInfo(MetricInfo.PARENT.traceName), + parentid); + metrics.add(parent); - PhoenixAbstractMetric end = new PhoenixMetricImpl(MetricInfo.END.traceName, endTime); - record.addMetric(end); + AbstractMetric start = new ExposedMetricCounterLong(asInfo(MetricInfo.START.traceName), + startTime); + metrics.add(start); + AbstractMetric + end = + new ExposedMetricCounterLong(asInfo(MetricInfo.END.traceName), endTime); + metrics.add(end); + + List<MetricsTag> tagsList = new ArrayList<MetricsTag>(); int tagCount = 0; for (String annotation : tags) { - PhoenixMetricTag tag = + MetricsTag tag = new PhoenixTagImpl(MetricInfo.ANNOTATION.traceName, Integer.toString(tagCount++), annotation); - record.addTag(tag); + tagsList.add(tag); } String hostnameValue = "host-name.value"; - PhoenixMetricTag hostnameTag = + MetricsTag hostnameTag = new PhoenixTagImpl(MetricInfo.HOSTNAME.traceName, "", hostnameValue); - record.addTag(hostnameTag); + tagsList.add(hostnameTag); + MetricsRecord record = + new ExposedMetricsRecordImpl(new ExposedMetricsInfoImpl(TracingUtils + .getTraceMetricName(traceid), desc), System.currentTimeMillis(), + tagsList, metrics); return record; } + + private static MetricsInfo asInfo(String name) { + return new ExposedMetricsInfoImpl(name, ""); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java index a054bf2..875717c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java @@ -17,31 +17,32 @@ */ package org.apache.phoenix.trace; -import java.sql.SQLException; -import java.util.concurrent.atomic.AtomicBoolean; - +import org.apache.commons.configuration.SubsetConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.phoenix.metrics.MetricsWriter; -import org.apache.phoenix.metrics.PhoenixMetricsRecord; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; + +import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicBoolean; /** * */ -public class DisableableMetricsWriter implements MetricsWriter { +public class DisableableMetricsWriter implements MetricsSink { private static final Log LOG = LogFactory.getLog(DisableableMetricsWriter.class); - private PhoenixTableMetricsWriter writer; + private PhoenixMetricsSink writer; private AtomicBoolean disabled = new AtomicBoolean(false); - public DisableableMetricsWriter(PhoenixTableMetricsWriter writer) { + public DisableableMetricsWriter(PhoenixMetricsSink writer) { this.writer = writer; } @Override - public void initialize() { + public void init(SubsetConfiguration config) { if (this.disabled.get()) return; - writer.initialize(); + writer.init(config); } @Override @@ -55,9 +56,9 @@ public class DisableableMetricsWriter implements MetricsWriter { } @Override - public void addMetrics(PhoenixMetricsRecord record) { + public void putMetrics(MetricsRecord record) { if (this.disabled.get()) return; - writer.addMetrics(record); + writer.putMetrics(record); } public void disable() { @@ -77,7 +78,7 @@ public class DisableableMetricsWriter implements MetricsWriter { } } - public PhoenixTableMetricsWriter getDelegate() { + public PhoenixMetricsSink getDelegate() { return this.writer; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java deleted file mode 100644 index 9a592d3..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.trace; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -import org.junit.runner.notification.RunNotifier; -import org.junit.runners.BlockJUnit4ClassRunner; -import org.junit.runners.model.FrameworkMethod; -import org.junit.runners.model.InitializationError; - -/** - * Test runner to run classes that depend on Hadoop1 compatibility that may not be present for the - * feature - */ -public class Hadoop1TracingTestEnabler extends BlockJUnit4ClassRunner { - - public Hadoop1TracingTestEnabler(Class<?> klass) throws InitializationError { - super(klass); - } - - @Override - public void runChild(FrameworkMethod method, RunNotifier notifier) { - // if the class is already disabled, then we can disable on the class level, otherwise we - // just check the per-method - Hadoop1Disabled condition = - getTestClass().getJavaClass().getAnnotation(Hadoop1Disabled.class); - if (condition == null) { - condition = method - .getAnnotation(Hadoop1Disabled.class); - } - - // if this has the flag, then we want to disable it if hadoop1 is not enabled for that - // feature - if (condition != null && getEnabled(condition.value())) { - super.runChild(method, notifier); - } else { - notifier.fireTestIgnored(describeChild(method)); - } - } - - /** - * Simple check that just uses if-else logic. We can move to something more complex, policy - * based later when this gets more complex. - * @param feature name of the feature to check - * @return <tt>true</tt> if the test method is enabled for the given feature, <tt>false</tt> - * otherwise - */ - private boolean getEnabled(String feature) { - if (feature.equals("tracing")) { - return !BaseTracingTestIT.shouldEarlyExitForHadoop1Test(); - } - return true; - } - - /** - * Marker that a class/method should be disabled if hadoop1 features are not enabled. It takes a - * value for the Hadoop1 feature on which this class/method depends, for instance "tracing" is - * not supported in Hadoop1 (yet). - */ - @Target({ ElementType.TYPE, ElementType.METHOD }) - @Retention(RetentionPolicy.RUNTIME) - public static @interface Hadoop1Disabled { - String value(); - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java deleted file mode 100644 index 985504f..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.trace; - -import org.apache.phoenix.metrics.PhoenixAbstractMetric; - -/** - * Simple metric implementation for testing - */ -public class PhoenixMetricImpl implements PhoenixAbstractMetric { - - private String name; - private Number value; - - public PhoenixMetricImpl(String name, Number value) { - this.name = name; - this.value = value; - } - - @Override - public String getName() { - return name; - } - - @Override - public Number value() { - return value; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java deleted file mode 100644 index 45cabf0..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.trace; - -import java.util.Collection; -import java.util.List; - -import org.apache.phoenix.metrics.PhoenixAbstractMetric; -import org.apache.phoenix.metrics.PhoenixMetricTag; -import org.apache.phoenix.metrics.PhoenixMetricsRecord; - -import com.google.common.collect.Lists; - -/** - * - */ -public class PhoenixMetricRecordImpl implements PhoenixMetricsRecord { - - private String name; - private String description; - private final List<PhoenixAbstractMetric> metrics = Lists.newArrayList(); - private final List<PhoenixMetricTag> tags = Lists.newArrayList(); - - public PhoenixMetricRecordImpl(String name, String description) { - this.name = name; - this.description = description; - } - - public void addMetric(PhoenixAbstractMetric metric) { - this.metrics.add(metric); - } - - public void addTag(PhoenixMetricTag tag) { - this.tags.add(tag); - } - - @Override - public String name() { - return this.name; - } - - @Override - public String description() { - return this.description; - } - - @Override - public Iterable<PhoenixAbstractMetric> metrics() { - return metrics; - } - - @Override - public Collection<PhoenixMetricTag> tags() { - return tags; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java index ecac21b..533b6f8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java @@ -17,25 +17,21 @@ */ package org.apache.phoenix.trace; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.sql.Connection; -import java.util.Collection; - -import org.apache.phoenix.metrics.PhoenixMetricsRecord; +import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.trace.Hadoop1TracingTestEnabler.Hadoop1Disabled; import org.apache.phoenix.trace.TraceReader.SpanInfo; import org.apache.phoenix.trace.TraceReader.TraceHolder; import org.junit.Test; -import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; /** * Test that the logging sink stores the expected metrics/stats */ -@RunWith(Hadoop1TracingTestEnabler.class) -@Hadoop1Disabled("tracing") public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT { /** @@ -45,7 +41,7 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT { */ @Test public void testCreatesTable() throws Exception { - PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter(); + PhoenixMetricsSink sink = new PhoenixMetricsSink(); Connection conn = getConnectionWithoutTracing(); sink.initForTesting(conn); @@ -69,13 +65,13 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT { /** * Simple metrics writing and reading check, that uses the standard wrapping in the - * {@link PhoenixMetricsWriter} + * {@link PhoenixMetricsSink} * @throws Exception on failure */ @Test public void writeMetrics() throws Exception { // hook up a phoenix sink - PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter(); + PhoenixMetricsSink sink = new PhoenixMetricsSink(); Connection conn = getConnectionWithoutTracing(); sink.initForTesting(conn); @@ -88,12 +84,12 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT { long endTime = 13; String annotation = "test annotation for a span"; String hostnameValue = "host-name.value"; - PhoenixMetricsRecord record = + MetricsRecord record = createRecord(traceid, parentid, spanid, description, startTime, endTime, hostnameValue, annotation); // actually write the record to the table - sink.addMetrics(record); + sink.putMetrics(record); sink.flush(); // make sure we only get expected stat entry (matcing the trace id), otherwise we could the http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java index c8e2219..a911a2c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java @@ -17,36 +17,22 @@ */ package org.apache.phoenix.trace; -import org.apache.phoenix.metrics.PhoenixMetricTag; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsTag; /** * Simple Tag implementation for testing */ -public class PhoenixTagImpl implements PhoenixMetricTag { +public class PhoenixTagImpl extends MetricsTag { private final String name; private final String description; private final String value; public PhoenixTagImpl(String name, String description, String value) { - super(); + super(new MetricsInfoImpl(name, description), value); this.name = name; this.description = description; this.value = value; } - - @Override - public String name() { - return name; - } - - @Override - public String description() { - return description; - } - - @Override - public String value() { - return value; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java index f0a47bb..d75e281 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java @@ -17,40 +17,31 @@ */ package org.apache.phoenix.trace; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsTag; import org.apache.phoenix.end2end.HBaseManagedTimeTest; import org.apache.phoenix.metrics.MetricInfo; -import org.apache.phoenix.metrics.PhoenixAbstractMetric; -import org.apache.phoenix.metrics.PhoenixMetricTag; -import org.apache.phoenix.metrics.PhoenixMetricsRecord; -import org.apache.phoenix.trace.Hadoop1TracingTestEnabler.Hadoop1Disabled; import org.apache.phoenix.trace.TraceReader.SpanInfo; import org.apache.phoenix.trace.TraceReader.TraceHolder; import org.cloudera.htrace.Span; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.*; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; /** * Test that the {@link TraceReader} will correctly read traces written by the - * {@link PhoenixTableMetricsWriter} + * {@link org.apache.phoenix.trace.PhoenixMetricsSink} */ -@RunWith(Hadoop1TracingTestEnabler.class) -@Hadoop1Disabled("tracing") @Category(HBaseManagedTimeTest.class) public class PhoenixTraceReaderIT extends BaseTracingTestIT { @@ -58,14 +49,14 @@ public class PhoenixTraceReaderIT extends BaseTracingTestIT { @Test public void singleSpan() throws Exception { - PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter(); + PhoenixMetricsSink sink = new PhoenixMetricsSink(); Properties props = new Properties(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); sink.initForTesting(conn); // create a simple metrics record long traceid = 987654; - PhoenixMetricsRecord record = + MetricsRecord record = createAndFlush(sink, traceid, Span.ROOT_SPAN_ID, 10, "root", 12, 13, "host-name.value", "test annotation for a span"); @@ -73,12 +64,12 @@ public class PhoenixTraceReaderIT extends BaseTracingTestIT { validateTraces(Collections.singletonList(record), conn, traceid); } - private PhoenixMetricsRecord createAndFlush(PhoenixTableMetricsWriter sink, long traceid, + private MetricsRecord createAndFlush(PhoenixMetricsSink sink, long traceid, long parentid, long spanid, String desc, long startTime, long endTime, String hostname, String... tags) { - PhoenixMetricsRecord record = + MetricsRecord record = createRecord(traceid, parentid, spanid, desc, startTime, endTime, hostname, tags); - sink.addMetrics(record); + sink.putMetrics(record); sink.flush(); return record; } @@ -91,14 +82,14 @@ public class PhoenixTraceReaderIT extends BaseTracingTestIT { @Test public void testMultipleSpans() throws Exception { // hook up a phoenix sink - PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter(); + PhoenixMetricsSink sink = new PhoenixMetricsSink(); Connection conn = getConnectionWithoutTracing(); sink.initForTesting(conn); // create a simple metrics record long traceid = 12345; - List<PhoenixMetricsRecord> records = new ArrayList<PhoenixMetricsRecord>(); - PhoenixMetricsRecord record = + List<MetricsRecord> records = new ArrayList<MetricsRecord>(); + MetricsRecord record = createAndFlush(sink, traceid, Span.ROOT_SPAN_ID, 7777, "root", 10, 30, "hostname.value", "root-span tag"); records.add(record); @@ -128,7 +119,7 @@ public class PhoenixTraceReaderIT extends BaseTracingTestIT { validateTraces(records, conn, traceid); } - private void validateTraces(List<PhoenixMetricsRecord> records, Connection conn, long traceid) + private void validateTraces(List<MetricsRecord> records, Connection conn, long traceid) throws Exception { TraceReader reader = new TraceReader(conn); Collection<TraceHolder> traces = reader.readAll(1); @@ -145,13 +136,13 @@ public class PhoenixTraceReaderIT extends BaseTracingTestIT { * @param records * @param trace */ - private void validateTrace(List<PhoenixMetricsRecord> records, TraceHolder trace) { + private void validateTrace(List<MetricsRecord> records, TraceHolder trace) { // drop each span into a sorted list so we get the expected ordering Iterator<SpanInfo> spanIter = trace.spans.iterator(); - for (PhoenixMetricsRecord record : records) { + for (MetricsRecord record : records) { SpanInfo spanInfo = spanIter.next(); LOG.info("Checking span:\n" + spanInfo); - Iterator<PhoenixAbstractMetric> metricIter = record.metrics().iterator(); + Iterator<AbstractMetric> metricIter = record.metrics().iterator(); assertEquals("Got an unexpected span id", metricIter.next().value(), spanInfo.id); long parentId = (Long) metricIter.next().value(); if (parentId == Span.ROOT_SPAN_ID) { @@ -162,12 +153,12 @@ public class PhoenixTraceReaderIT extends BaseTracingTestIT { assertEquals("Got an unexpected start time", metricIter.next().value(), spanInfo.start); assertEquals("Got an unexpected end time", metricIter.next().value(), spanInfo.end); - Iterator<PhoenixMetricTag> tags = record.tags().iterator(); + Iterator<MetricsTag> tags = record.tags().iterator(); int annotationCount = 0; while (tags.hasNext()) { // hostname is a tag, so we differentiate it - PhoenixMetricTag tag = tags.next(); + MetricsTag tag = tags.next(); if (tag.name().equals(MetricInfo.HOSTNAME.traceName)) { assertEquals("Didn't store correct hostname value", tag.value(), spanInfo.hostname); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java index 87d80da..f4cf0d1 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java @@ -17,46 +17,37 @@ */ package org.apache.phoenix.trace; -import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Collection; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - +import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.MetricsSource; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.end2end.HBaseManagedTimeTest; import org.apache.phoenix.metrics.Metrics; -import org.apache.phoenix.metrics.TracingTestCompat; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.trace.Hadoop1TracingTestEnabler.Hadoop1Disabled; import org.apache.phoenix.trace.TraceReader.SpanInfo; import org.apache.phoenix.trace.TraceReader.TraceHolder; -import org.cloudera.htrace.Sampler; -import org.cloudera.htrace.Span; -import org.cloudera.htrace.SpanReceiver; -import org.cloudera.htrace.Trace; -import org.cloudera.htrace.TraceScope; +import org.cloudera.htrace.*; import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import com.google.common.collect.ImmutableMap; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Test that the logging sink stores the expected metrics/stats */ -@RunWith(Hadoop1TracingTestEnabler.class) -@Hadoop1Disabled("tracing") @Category(HBaseManagedTimeTest.class) public class PhoenixTracingEndToEndIT extends BaseTracingTestIT { @@ -69,15 +60,12 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT { @BeforeClass public static void setupMetrics() throws Exception { - if (shouldEarlyExitForHadoop1Test()) { - return; - } - PhoenixTableMetricsWriter pWriter = new PhoenixTableMetricsWriter(); + PhoenixMetricsSink pWriter = new PhoenixMetricsSink(); Connection conn = getConnectionWithoutTracing(); pWriter.initForTesting(conn); sink = new DisableableMetricsWriter(pWriter); - TracingTestCompat.registerSink(sink); + TracingTestUtil.registerSink(sink); } @After @@ -112,10 +100,10 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT { @Test public void testWriteSpans() throws Exception { // get a receiver for the spans - SpanReceiver receiver = TracingCompat.newTraceMetricSource(); + SpanReceiver receiver = new TraceMetricSource(); // which also needs to a source for the metrics system - Metrics.getManager().registerSource("testWriteSpans-source", "source for testWriteSpans", - receiver); + Metrics.initialize().register("testWriteSpans-source", "source for testWriteSpans", + (MetricsSource) receiver); // watch our sink so we know when commits happen CountDownLatch latch = new CountDownLatch(1); @@ -128,7 +116,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT { // add a child with some annotations Span child = span.child("child 1"); child.addTimelineAnnotation("timeline annotation"); - TracingCompat.addAnnotation(child, "test annotation", 10); + TracingUtils.addAnnotation(child, "test annotation", 10); child.stop(); // sleep a little bit to get some time difference @@ -230,10 +218,7 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT { if (traceInfo.contains(QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME)) { return false; } - if (traceInfo.contains("Completing index")) { - return true; - } - return false; + return traceInfo.contains("Completing index"); } }); @@ -467,4 +452,4 @@ public class PhoenixTracingEndToEndIT extends BaseTracingTestIT { } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java new file mode 100644 index 0000000..d502175 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/TracingTestUtil.java @@ -0,0 +1,14 @@ +package org.apache.phoenix.trace; + +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.phoenix.metrics.Metrics; + +/** + * + */ +public class TracingTestUtil { + + public static void registerSink(MetricsSink sink){ + Metrics.initialize().register("phoenix", "test sink gets logged", sink); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index d55dfbf..9c48a8d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -64,7 +64,7 @@ import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache; import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy; import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter; -import org.apache.phoenix.trace.TracingCompat; +import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; @@ -276,7 +276,7 @@ public class Indexer extends BaseRegionObserver { this.builder.getIndexUpdate(miniBatchOp, mutations.values()); current.addTimelineAnnotation("Built index updates, doing preStep"); - TracingCompat.addAnnotation(current, "index update count", indexUpdates.size()); + TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); // write them, either to WAL or the index tables doPre(indexUpdates, edit, durability); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/main/java/org/apache/phoenix/metrics/MetricInfo.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/metrics/MetricInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/metrics/MetricInfo.java new file mode 100644 index 0000000..e6ad976 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/metrics/MetricInfo.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.metrics; + +/** + * Metrics and their conversion from the trace name to the name we store in the stats table + */ +public enum MetricInfo { + + TRACE("", "trace_id"), + SPAN("span_id", "span_id"), + PARENT("parent_id", "parent_id"), + START("start_time", "start_time"), + END("end_time", "end_time"), + TAG("phoenix.tag", "t"), + ANNOTATION("phoenix.annotation", "a"), + HOSTNAME("Hostname", "hostname"), + DESCRIPTION("", "description"); + + public final String traceName; + public final String columnName; + + private MetricInfo(String traceName, String columnName) { + this.traceName = traceName; + this.columnName = columnName; + } + + public static String getColumnName(String traceName) { + for (MetricInfo info : MetricInfo.values()) { + if (info.traceName.equals(traceName)) { + return info.columnName; + } + } + throw new IllegalArgumentException("Unknown tracename: " + traceName); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/main/java/org/apache/phoenix/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/metrics/Metrics.java b/phoenix-core/src/main/java/org/apache/phoenix/metrics/Metrics.java new file mode 100644 index 0000000..24950c4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/metrics/Metrics.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + +public class Metrics { + + private static final Log LOG = LogFactory.getLog(Metrics.class); + + private static volatile MetricsSystem manager = DefaultMetricsSystem.instance(); + + private static boolean initialized; + + /** This must match the prefix that we are using in the hadoop-metrics2 config on the client */ + public static final String METRICS_SYSTEM_NAME = "phoenix"; + public static MetricsSystem initialize() { + // if the jars aren't on the classpath, then we don't start the metrics system + if (manager == null) { + LOG.warn("Phoenix metrics could not be initialized - no MetricsManager found!"); + return null; + } + // only initialize the metrics system once + synchronized (Metrics.class) { + if (!initialized) { + LOG.info("Initializing metrics system: " + Metrics.METRICS_SYSTEM_NAME); + manager.init(Metrics.METRICS_SYSTEM_NAME); + initialized = true; + } + } + return manager; + } + + private static volatile boolean sinkInitialized = false; + + /** + * Mark that the metrics/tracing sink has been initialized + */ + public static void markSinkInitialized() { + sinkInitialized = true; + } + + public static void ensureConfigured() { + if (!sinkInitialized) { + LOG.warn("Phoenix metrics2/tracing sink was not started. Should be it be?"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/main/java/org/apache/phoenix/trace/MetricsInfoImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/MetricsInfoImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/MetricsInfoImpl.java new file mode 100644 index 0000000..47c1dda --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/MetricsInfoImpl.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import com.google.common.base.Objects; +import static com.google.common.base.Preconditions.*; +import org.apache.hadoop.metrics2.MetricsInfo; + +/** + * Making implementing metric info a little easier + * <p> + * Just a copy of the same from Hadoop, but exposed for usage. + */ +public class MetricsInfoImpl implements MetricsInfo { + private final String name, description; + + MetricsInfoImpl(String name, String description) { + this.name = checkNotNull(name, "name"); + this.description = checkNotNull(description, "description"); + } + + @Override public String name() { + return name; + } + + @Override public String description() { + return description; + } + + @Override public boolean equals(Object obj) { + if (obj instanceof MetricsInfo) { + MetricsInfo other = (MetricsInfo) obj; + return Objects.equal(name, other.name()) && + Objects.equal(description, other.description()); + } + return false; + } + + @Override public int hashCode() { + return Objects.hashCode(name, description); + } + + @Override public String toString() { + return Objects.toStringHelper(this) + .add("name", name).add("description", description) + .toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java new file mode 100644 index 0000000..265fc78 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.phoenix.metrics.*; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.util.QueryUtil; + +import javax.annotation.Nullable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.*; + +import static org.apache.phoenix.metrics.MetricInfo.*; +import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME; + +/** + * Write the metrics to a phoenix table. + * Generally, this class is instantiated via hadoop-metrics2 property files. + * Specifically, you would create this class by adding the following to + * by + * This would actually be set as: <code> + * [prefix].sink.[some instance name].class=org.apache.phoenix.trace.PhoenixMetricsSink + * </code>, where <tt>prefix</tt> is either: + * <ol> + * <li>"phoenix", for the client</li> + * <li>"hbase", for the server</li> + * </ol> + * and + * <tt>some instance name</tt> is just any unique name, so properties can be differentiated if + * there are multiple sinks of the same type created + */ +public class PhoenixMetricsSink implements MetricsSink { + + private static final Log LOG = LogFactory.getLog(PhoenixMetricsSink.class); + + private static final String VARIABLE_VALUE = "?"; + + private static final Joiner COLUMN_JOIN = Joiner.on("."); + static final String TAG_FAMILY = "tags"; + /** + * Count of the number of tags we are storing for this row + */ + static final String TAG_COUNT = COLUMN_JOIN.join(TAG_FAMILY, "count"); + + static final String ANNOTATION_FAMILY = "annotations"; + static final String ANNOTATION_COUNT = COLUMN_JOIN.join(ANNOTATION_FAMILY, "count"); + + /** + * Join strings on a comma + */ + private static final Joiner COMMAS = Joiner.on(','); + + private Connection conn; + + private String table; + + public PhoenixMetricsSink() { + LOG.info("Writing tracing metrics to phoenix table"); + + } + + @Override + public void init(SubsetConfiguration config) { + Metrics.markSinkInitialized(); + LOG.info("Phoenix tracing writer started"); + } + + /** + * Initialize <tt>this</tt> only when we need it + */ + private void lazyInitialize() { + synchronized (this) { + if (this.conn != null) { + return; + } + try { + // create the phoenix connection + Properties props = new Properties(); + props.setProperty(QueryServices.TRACING_FREQ_ATTRIB, + Tracing.Frequency.NEVER.getKey()); + org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); + Connection conn = QueryUtil.getConnection(props, conf); + // enable bulk loading when we have enough data + conn.setAutoCommit(true); + + String tableName = + conf.get(QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB, + QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME); + + initializeInternal(conn, tableName); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private void initializeInternal(Connection conn, String tableName) throws SQLException { + this.conn = conn; + + // ensure that the target table already exists + createTable(conn, tableName); + } + + /** + * Used for <b>TESTING ONLY</b> + * Initialize the connection and setup the table to use the + * {@link org.apache.phoenix.query.QueryServicesOptions#DEFAULT_TRACING_STATS_TABLE_NAME} + * + * @param conn to store for upserts and to create the table (if necessary) + * @throws SQLException if any phoenix operation fails + */ + @VisibleForTesting + public void initForTesting(Connection conn) throws SQLException { + initializeInternal(conn, QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME); + } + + /** + * Create a stats table with the given name. Stores the name for use later when creating upsert + * statements + * + * @param conn connection to use when creating the table + * @param table name of the table to create + * @throws SQLException if any phoenix operations fails + */ + private void createTable(Connection conn, String table) throws SQLException { + // only primary-key columns can be marked non-null + String ddl = + "create table if not exists " + table + "( " + + TRACE.columnName + " bigint not null, " + + PARENT.columnName + " bigint not null, " + + SPAN.columnName + " bigint not null, " + + DESCRIPTION.columnName + " varchar, " + + START.columnName + " bigint, " + + END.columnName + " bigint, " + + HOSTNAME.columnName + " varchar, " + + TAG_COUNT + " smallint, " + + ANNOTATION_COUNT + " smallint" + + " CONSTRAINT pk PRIMARY KEY (" + TRACE.columnName + ", " + + PARENT.columnName + ", " + SPAN.columnName + "))\n"; + PreparedStatement stmt = conn.prepareStatement(ddl); + stmt.execute(); + this.table = table; + } + + @Override + public void flush() { + try { + this.conn.commit(); + this.conn.rollback(); + } catch (SQLException e) { + LOG.error("Failed to commit changes to table", e); + } + } + + /** + * Add a new metric record to be written. + * + * @param record + */ + @Override + public void putMetrics(MetricsRecord record) { + // its not a tracing record, we are done. This could also be handled by filters, but safer + // to do it here, in case it gets misconfigured + if (!record.name().startsWith(TracingUtils.METRIC_SOURCE_KEY)) { + return; + } + + // don't initialize until we actually have something to write + lazyInitialize(); + + String stmt = "UPSERT INTO " + table + " ("; + // drop it into the queue of things that should be written + List<String> keys = new ArrayList<String>(); + List<Object> values = new ArrayList<Object>(); + // we need to keep variable values in a separate set since they may have spaces, which + // causes the parser to barf. Instead, we need to add them after the statement is prepared + List<String> variableValues = new ArrayList<String>(record.tags().size()); + keys.add(TRACE.columnName); + values.add( + Long.parseLong(record.name().substring(TracingUtils.METRIC_SOURCE_KEY.length()))); + + keys.add(DESCRIPTION.columnName); + values.add(VARIABLE_VALUE); + variableValues.add(record.description()); + + // add each of the metrics + for (AbstractMetric metric : record.metrics()) { + // name of the metric is also the column name to which we write + keys.add(MetricInfo.getColumnName(metric.name())); + values.add(metric.value()); + } + + // get the tags out so we can set them later (otherwise, need to be a single value) + int annotationCount = 0; + int tagCount = 0; + for (MetricsTag tag : record.tags()) { + if (tag.name().equals(ANNOTATION.traceName)) { + addDynamicEntry(keys, values, variableValues, ANNOTATION_FAMILY, tag, ANNOTATION, + annotationCount); + annotationCount++; + } else if (tag.name().equals(TAG.traceName)) { + addDynamicEntry(keys, values, variableValues, TAG_FAMILY, tag, TAG, tagCount); + tagCount++; + } else if (tag.name().equals(HOSTNAME.traceName)) { + keys.add(HOSTNAME.columnName); + values.add(VARIABLE_VALUE); + variableValues.add(tag.value()); + } else if (tag.name().equals("Context")) { + // ignored + } else { + LOG.error("Got an unexpected tag: " + tag); + } + } + + // add the tag count, now that we know it + keys.add(TAG_COUNT); + // ignore the hostname in the tags, if we know it + values.add(tagCount); + + keys.add(ANNOTATION_COUNT); + values.add(annotationCount); + + // compile the statement together + stmt += COMMAS.join(keys); + stmt += ") VALUES (" + COMMAS.join(values) + ")"; + + if (LOG.isTraceEnabled()) { + LOG.trace("Logging metrics to phoenix table via: " + stmt); + LOG.trace("With tags: " + variableValues); + } + try { + PreparedStatement ps = conn.prepareStatement(stmt); + // add everything that wouldn't/may not parse + int index = 1; + for (String tag : variableValues) { + ps.setString(index++, tag); + } + ps.execute(); + } catch (SQLException e) { + LOG.error("Could not write metric: \n" + record + " to prepared statement:\n" + stmt, + e); + } + } + + public static String getDynamicColumnName(String family, String column, int count) { + return COLUMN_JOIN.join(family, column) + count; + } + + private void addDynamicEntry(List<String> keys, List<Object> values, + List<String> variableValues, String family, MetricsTag tag, + MetricInfo metric, int count) { + // <family><.dynColumn><count> <VARCHAR> + keys.add(getDynamicColumnName(family, metric.columnName, count) + " VARCHAR"); + + // build the annotation value + String val = tag.description() + " - " + tag.value(); + values.add(VARIABLE_VALUE); + variableValues.add(val); + } + + @VisibleForTesting + public void clearForTesting() throws SQLException { + this.conn.rollback(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java deleted file mode 100644 index 7fcb92d..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.trace; - -import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION; -import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION; -import static org.apache.phoenix.metrics.MetricInfo.END; -import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME; -import static org.apache.phoenix.metrics.MetricInfo.PARENT; -import static org.apache.phoenix.metrics.MetricInfo.SPAN; -import static org.apache.phoenix.metrics.MetricInfo.START; -import static org.apache.phoenix.metrics.MetricInfo.TAG; -import static org.apache.phoenix.metrics.MetricInfo.TRACE; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.phoenix.metrics.MetricInfo; -import org.apache.phoenix.metrics.MetricsWriter; -import org.apache.phoenix.metrics.PhoenixAbstractMetric; -import org.apache.phoenix.metrics.PhoenixMetricTag; -import org.apache.phoenix.metrics.PhoenixMetricsRecord; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.trace.util.Tracing; -import org.apache.phoenix.util.QueryUtil; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; - -/** - * Sink that writes phoenix metrics to a phoenix table - * <p> - * Each metric record should only correspond to a single completed span. Each span is only updated - * in the phoenix table <i>once</i> - */ -public class PhoenixTableMetricsWriter implements MetricsWriter { - - private static final String VARIABLE_VALUE = "?"; - - public static final Log LOG = LogFactory.getLog(PhoenixTableMetricsWriter.class); - - private static final Joiner COLUMN_JOIN = Joiner.on("."); - static final String TAG_FAMILY = "tags"; - /** Count of the number of tags we are storing for this row */ - static final String TAG_COUNT = COLUMN_JOIN.join(TAG_FAMILY, "count"); - - static final String ANNOTATION_FAMILY = "annotations"; - static final String ANNOTATION_COUNT = COLUMN_JOIN.join(ANNOTATION_FAMILY, "count"); - - /** Join strings on a comma */ - private static final Joiner COMMAS = Joiner.on(','); - - private Connection conn; - - private String table; - - @Override - public void initialize() { - LOG.info("Phoenix tracing writer started"); - } - - /** - * Initialize <tt>this</tt> only when we need it - */ - private void lazyInitialize() { - synchronized (this) { - if (this.conn != null) { - return; - } - try { - // create the phoenix connection - Properties props = new Properties(); - props.setProperty(QueryServices.TRACING_FREQ_ATTRIB, - Tracing.Frequency.NEVER.getKey()); - Configuration conf = HBaseConfiguration.create(); - Connection conn = QueryUtil.getConnection(props, conf); - // enable bulk loading when we have enough data - conn.setAutoCommit(true); - - String tableName = - conf.get(QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB, - QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME); - - initializeInternal(conn, tableName); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - private void initializeInternal(Connection conn, String tableName) throws SQLException { - this.conn = conn; - - // ensure that the target table already exists - createTable(conn, tableName); - } - - /** - * Used for <b>TESTING ONLY</b> - * <p> - * Initialize the connection and setup the table to use the - * {@link TracingCompat#DEFAULT_TRACING_STATS_TABLE_NAME} - * @param conn to store for upserts and to create the table (if necessary) - * @throws SQLException if any phoenix operation fails - */ - @VisibleForTesting - public void initForTesting(Connection conn) throws SQLException { - initializeInternal(conn, QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME); - } - - /** - * Create a stats table with the given name. Stores the name for use later when creating upsert - * statements - * @param conn connection to use when creating the table - * @param table name of the table to create - * @throws SQLException if any phoenix operations fails - */ - private void createTable(Connection conn, String table) throws SQLException { - // only primary-key columns can be marked non-null - String ddl = - "create table if not exists " + table + "( " + - TRACE.columnName + " bigint not null, " + - PARENT.columnName + " bigint not null, " + - SPAN.columnName + " bigint not null, " + - DESCRIPTION.columnName + " varchar, " + - START.columnName + " bigint, " + - END.columnName + " bigint, " + - HOSTNAME.columnName + " varchar, " + - TAG_COUNT + " smallint, " + - ANNOTATION_COUNT + " smallint" + - " CONSTRAINT pk PRIMARY KEY (" + TRACE.columnName + ", " - + PARENT.columnName + ", " + SPAN.columnName + "))\n"; - PreparedStatement stmt = conn.prepareStatement(ddl); - stmt.execute(); - this.table = table; - } - - @Override - public void flush() { - try { - this.conn.commit(); - this.conn.rollback(); - } catch (SQLException e) { - LOG.error("Failed to commit changes to table", e); - } - } - - /** - * Add a new metric record to be written. - * @param record - */ - @Override - public void addMetrics(PhoenixMetricsRecord record) { - // its not a tracing record, we are done. This could also be handled by filters, but safer - // to do it here, in case it gets misconfigured - if (!record.name().startsWith(TracingCompat.METRIC_SOURCE_KEY)) { - return; - } - - // don't initialize until we actually have something to write - lazyInitialize(); - - String stmt = "UPSERT INTO " + table + " ("; - // drop it into the queue of things that should be written - List<String> keys = new ArrayList<String>(); - List<Object> values = new ArrayList<Object>(); - // we need to keep variable values in a separate set since they may have spaces, which - // causes the parser to barf. Instead, we need to add them after the statement is prepared - List<String> variableValues = new ArrayList<String>(record.tags().size()); - keys.add(TRACE.columnName); - values.add(Long.parseLong(record.name().substring(TracingCompat.METRIC_SOURCE_KEY.length()))); - - keys.add(DESCRIPTION.columnName); - values.add(VARIABLE_VALUE); - variableValues.add(record.description()); - - // add each of the metrics - for (PhoenixAbstractMetric metric : record.metrics()) { - // name of the metric is also the column name to which we write - keys.add(MetricInfo.getColumnName(metric.getName())); - values.add(metric.value()); - } - - // get the tags out so we can set them later (otherwise, need to be a single value) - int annotationCount = 0; - int tagCount = 0; - for (PhoenixMetricTag tag : record.tags()) { - if (tag.name().equals(ANNOTATION.traceName)) { - addDynamicEntry(keys, values, variableValues, ANNOTATION_FAMILY, tag, ANNOTATION, - annotationCount); - annotationCount++; - } else if (tag.name().equals(TAG.traceName)) { - addDynamicEntry(keys, values, variableValues, TAG_FAMILY, tag, TAG, tagCount); - tagCount++; - } else if (tag.name().equals(HOSTNAME.traceName)) { - keys.add(HOSTNAME.columnName); - values.add(VARIABLE_VALUE); - variableValues.add(tag.value()); - } else if (tag.name().equals("Context")) { - // ignored - } else { - LOG.error("Got an unexpected tag: " + tag); - } - } - - // add the tag count, now that we know it - keys.add(TAG_COUNT); - // ignore the hostname in the tags, if we know it - values.add(tagCount); - - keys.add(ANNOTATION_COUNT); - values.add(annotationCount); - - // compile the statement together - stmt += COMMAS.join(keys); - stmt += ") VALUES (" + COMMAS.join(values) + ")"; - - if (LOG.isTraceEnabled()) { - LOG.trace("Logging metrics to phoenix table via: " + stmt); - LOG.trace("With tags: " + variableValues); - } - try { - PreparedStatement ps = conn.prepareStatement(stmt); - // add everything that wouldn't/may not parse - int index = 1; - for (String tag : variableValues) { - ps.setString(index++, tag); - } - ps.execute(); - } catch (SQLException e) { - LOG.error("Could not write metric: \n" + record + " to prepared statement:\n" + stmt, e); - } - } - - public static String getDynamicColumnName(String family, String column, int count) { - return COLUMN_JOIN.join(family, column) + count; - } - - private void addDynamicEntry(List<String> keys, List<Object> values, - List<String> variableValues, String family, PhoenixMetricTag tag, - MetricInfo metric, int count) { - // <family><.dynColumn><count> <VARCHAR> - keys.add(getDynamicColumnName(family, metric.columnName, count) + " VARCHAR"); - - // build the annotation value - String val = tag.description() + " - " + tag.value(); - values.add(VARIABLE_VALUE); - variableValues.add(val); - } - - public void clearForTesting() throws SQLException { - this.conn.rollback(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java new file mode 100644 index 0000000..1b9e31a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.metrics2.*; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.phoenix.metrics.MetricInfo; +import org.apache.phoenix.metrics.Metrics; +import org.cloudera.htrace.HTraceConfiguration; +import org.cloudera.htrace.Span; +import org.cloudera.htrace.SpanReceiver; +import org.cloudera.htrace.TimelineAnnotation; +import org.cloudera.htrace.impl.MilliSpan; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import static org.apache.phoenix.metrics.MetricInfo.*; + +/** + * Sink for request traces ({@link SpanReceiver}) that pushes writes to {@link MetricsSource} in a + * format that we can more easily consume. + * <p> + * <p> + * Rather than write directly to a phoenix table, we drop it into the metrics queue so we can more + * cleanly handle it asyncrhonously.Currently, {@link MilliSpan} submits the span in a synchronized + * block to all the receivers, which could have a lot of overhead if we are submitting to multiple + * receivers. + * <p> + * The format of the generated metrics is this: + * <ol> + * <li>All Metrics from the same span have the same name (allowing correlation in the sink)</li> + * <li>The description of the metric describes what it contains. For instance, + * <ul> + * <li>{@link MetricInfo#PARENT} is the id of the parent of this span. (Root span is + * {@link Span#ROOT_SPAN_ID}).</li> + * <li>{@value MetricInfo#START} is the start time of the span</li> + * <li>{@value MetricInfo#END} is the end time of the span</li> + * </ul></li> + * <li>Each span's messages are contained in a {@link MetricsTag} with the same name as above and a + * generic counter for the number of messages (to differentiate messages and provide timeline + * ordering).</li> + * </ol> + * <p> + * <i>So why even submit to metrics2 framework if we only have a single source?</i> + * <p> + * This allows us to make the updates in batches. We might have spans that finish before other spans + * (for instance in the same parent). By batching the updates we can lessen the overhead on the + * client, which is also busy doing 'real' work. <br> + * We could make our own queue and manage batching and filtering and dropping extra metrics, but + * that starts to get complicated fast (its not as easy as it sounds) so we use metrics2 to abstract + * out that pipeline and also provides us flexibility to dump metrics to other sources. + * <p> + * This is a somewhat rough implementation - we do excessive locking for correctness, + * rather than trying to make it fast, for the moment. + */ +public class TraceMetricSource implements SpanReceiver, MetricsSource { + + private static final String EMPTY_STRING = ""; + + private static final String CONTEXT = "tracing"; + + private List<Metric> spans = new ArrayList<Metric>(); + + public TraceMetricSource() { + + MetricsSystem manager = Metrics.initialize(); + + // Register this instance. + // For right now, we ignore the MBean registration issues that show up in DEBUG logs. Basically, + // we need a Jmx MBean compliant name. We'll get to a better name when we want that later + manager.register(CONTEXT, "Phoenix call tracing", this); + } + + @Override + public void receiveSpan(Span span) { + Metric builder = new Metric(span); + // add all the metrics for the span + builder.addCounter(Interns.info(SPAN.traceName, EMPTY_STRING), span.getSpanId()); + builder.addCounter(Interns.info(PARENT.traceName, EMPTY_STRING), span.getParentId()); + builder.addCounter(Interns.info(START.traceName, EMPTY_STRING), span.getStartTimeMillis()); + builder.addCounter(Interns.info(END.traceName, EMPTY_STRING), span.getStopTimeMillis()); + // add the tags to the span. They were written in order received so we mark them as such + for (TimelineAnnotation ta : span.getTimelineAnnotations()) { + builder.add(new MetricsTag(Interns.info(TAG.traceName, Long.toString(ta.getTime())), ta + .getMessage())); + } + + // add the annotations. We assume they are serialized as strings and integers, but that can + // change in the future + Map<byte[], byte[]> annotations = span.getKVAnnotations(); + for (Entry<byte[], byte[]> annotation : annotations.entrySet()) { + Pair<String, String> val = + TracingUtils.readAnnotation(annotation.getKey(), annotation.getValue()); + builder.add(new MetricsTag(Interns.info(ANNOTATION.traceName, val.getFirst()), val + .getSecond())); + } + + // add the span to the list we care about + synchronized (this) { + spans.add(builder); + } + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + // add a marker record so we know how many spans are used + // this is also necessary to ensure that we register the metrics source as an MBean (avoiding a + // runtime warning) + MetricsRecordBuilder marker = collector.addRecord(TracingUtils.METRICS_MARKER_CONTEXT); + marker.add(new MetricsTag(new MetricsInfoImpl("stat", "num spans"), Integer + .toString(spans.size()))); + + // actually convert the known spans into metric records as well + synchronized (this) { + for (Metric span : spans) { + MetricsRecordBuilder builder = collector.addRecord(new MetricsInfoImpl(TracingUtils + .getTraceMetricName(span.id), span.desc)); + builder.setContext(TracingUtils.METRICS_CONTEXT); + for (Pair<MetricsInfo, Long> metric : span.counters) { + builder.addCounter(metric.getFirst(), metric.getSecond()); + } + for (MetricsTag tag : span.tags) { + builder.add(tag); + } + } + // reset the spans so we don't keep a big chunk of memory around + spans = new ArrayList<Metric>(); + } + } + + @Override + public void close() throws IOException { + // noop + } + + @Override + public void configure(HTraceConfiguration conf) { + // noop + } + + private static class Metric { + + List<Pair<MetricsInfo, Long>> counters = new ArrayList<Pair<MetricsInfo, Long>>(); + List<MetricsTag> tags = new ArrayList<MetricsTag>(); + private String id; + private String desc; + + public Metric(Span span) { + this.id = Long.toString(span.getTraceId()); + this.desc = span.getDescription(); + } + + /** + * @param metricsInfoImpl + * @param startTimeMillis + */ + public void addCounter(MetricsInfo metricsInfoImpl, long startTimeMillis) { + counters.add(new Pair<MetricsInfo, Long>(metricsInfoImpl, startTimeMillis)); + } + + /** + * @param metricsTag + */ + public void add(MetricsTag metricsTag) { + tags.add(metricsTag); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b48ca7b5/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java index 3d6eb9b..f3fc81d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java @@ -40,7 +40,7 @@ import com.google.common.base.Joiner; import com.google.common.primitives.Longs; /** - * Read the traces written to phoenix tables by the {@link PhoenixTableMetricsWriter}. + * Read the traces written to phoenix tables by the {@link PhoenixMetricsSink}. */ public class TraceReader { @@ -54,8 +54,8 @@ public class TraceReader { comma.join(MetricInfo.TRACE.columnName, MetricInfo.PARENT.columnName, MetricInfo.SPAN.columnName, MetricInfo.DESCRIPTION.columnName, MetricInfo.START.columnName, MetricInfo.END.columnName, - MetricInfo.HOSTNAME.columnName, PhoenixTableMetricsWriter.TAG_COUNT, - PhoenixTableMetricsWriter.ANNOTATION_COUNT); + MetricInfo.HOSTNAME.columnName, PhoenixMetricsSink.TAG_COUNT, + PhoenixMetricsSink.ANNOTATION_COUNT); } private Connection conn; @@ -181,13 +181,13 @@ public class TraceReader { private Collection<? extends String> getTags(long traceid, long parent, long span, int count) throws SQLException { return getDynamicCountColumns(traceid, parent, span, count, - PhoenixTableMetricsWriter.TAG_FAMILY, MetricInfo.TAG.columnName); + PhoenixMetricsSink.TAG_FAMILY, MetricInfo.TAG.columnName); } private Collection<? extends String> getAnnotations(long traceid, long parent, long span, int count) throws SQLException { return getDynamicCountColumns(traceid, parent, span, count, - PhoenixTableMetricsWriter.ANNOTATION_FAMILY, MetricInfo.ANNOTATION.columnName); + PhoenixMetricsSink.ANNOTATION_FAMILY, MetricInfo.ANNOTATION.columnName); } private Collection<? extends String> getDynamicCountColumns(long traceid, long parent, @@ -199,7 +199,7 @@ public class TraceReader { // build the column strings, family.column<index> String[] parts = new String[count]; for (int i = 0; i < count; i++) { - parts[i] = PhoenixTableMetricsWriter.getDynamicColumnName(family, columnName, i); + parts[i] = PhoenixMetricsSink.getDynamicColumnName(family, columnName, i); } // join the columns together String columns = comma.join(parts);