This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch percentile in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit cbcd8a8c23a3544aac3341affb641f2988951382 Author: Wu Sheng <[email protected]> AuthorDate: Tue Apr 28 11:01:33 2020 +0800 Support percentile function in the meter system. --- .../skywalking/oal/rt/parser/AnalysisResult.java | 2 +- .../oal/rt/parser/PersistenceColumns.java | 12 +- .../skywalking/oal/rt/parser/PersistenceField.java | 4 +- .../code-templates/metrics/deserialize.ftl | 2 +- .../server/core/analysis/meter/MeterSystem.java | 19 +- .../analysis/meter/function/HistogramFunction.java | 15 -- .../meter/function/PercentileFunction.java | 297 +++++++++++++++++++++ .../oap/server/core/analysis/metrics/IntList.java | 87 ++++++ .../meter/function/PercentileFunctionTest.java | 208 +++++++++++++++ .../provider/PrometheusFetcherProvider.java | 37 +++ .../elasticsearch/base/ColumnTypeEsMapping.java | 3 +- .../plugin/jdbc/h2/dao/H2TableInstaller.java | 4 +- .../plugin/jdbc/mysql/MySQLTableInstaller.java | 3 +- 13 files changed, 658 insertions(+), 35 deletions(-) diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java index c0cbe70..31a975b 100644 --- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java +++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java @@ -139,7 +139,7 @@ public class AnalysisResult { } else if (columnType.equals(long.class)) { serializeFields.addLongField(column.getFieldName()); } else if (StorageDataComplexObject.class.isAssignableFrom(columnType)) { - serializeFields.addObjectField(column.getFieldName()); + serializeFields.addObjectField(column.getFieldName(), columnType.getName()); } else { throw new IllegalStateException( "Unexpected field type [" + columnType.getSimpleName() + "] of persistence column [" + column diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java index 0fab3aa..195e3f8 100644 --- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java +++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java @@ -29,23 +29,23 @@ public class PersistenceColumns { private List<PersistenceField> objectFields = new LinkedList<>(); public void addStringField(String fieldName) { - stringFields.add(new PersistenceField(fieldName)); + stringFields.add(new PersistenceField(fieldName, "String")); } public void addLongField(String fieldName) { - longFields.add(new PersistenceField(fieldName)); + longFields.add(new PersistenceField(fieldName, "long")); } public void addDoubleField(String fieldName) { - doubleFields.add(new PersistenceField(fieldName)); + doubleFields.add(new PersistenceField(fieldName, "double")); } public void addIntField(String fieldName) { - intFields.add(new PersistenceField(fieldName)); + intFields.add(new PersistenceField(fieldName, "int")); } - public void addObjectField(String fieldName) { - objectFields.add(new PersistenceField(fieldName)); + public void addObjectField(String fieldName, String fieldType) { + objectFields.add(new PersistenceField(fieldName, fieldType)); } public List<PersistenceField> getStringFields() { diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java index 015cebb..622e7e8 100644 --- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java +++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java @@ -29,10 +29,12 @@ public class PersistenceField { private String fieldName; private String setter; private String getter; + private String fieldType; - public PersistenceField(String fieldName) { + public PersistenceField(String fieldName, String fieldType) { this.fieldName = fieldName; this.setter = ClassMethodUtil.toSetMethod(fieldName); this.getter = ClassMethodUtil.toGetMethod(fieldName); + this.fieldType = fieldType; } } diff --git a/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl b/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl index 2484151..455634f 100644 --- a/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl +++ b/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl @@ -16,7 +16,7 @@ public void deserialize(org.apache.skywalking.oap.server.core.remote.grpc.proto. </#list> <#list serializeFields.objectFields as field> - ${field.setter}(new org.apache.skywalking.oap.server.core.analysis.metrics.DataTable(remoteData.getDataObjectStrings(${field?index}))); + ${field.setter}(new ${field.fieldType}(remoteData.getDataObjectStrings(${field?index}))); </#list> } \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java index 2d5173b..6ba9a47 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java @@ -159,13 +159,18 @@ public class MeterSystem implements Service { boolean foundDataType = false; String acceptance = null; for (final Type genericInterface : meterFunction.getGenericInterfaces()) { - ParameterizedType parameterizedType = (ParameterizedType) genericInterface; - if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) { - Type[] arguments = parameterizedType.getActualTypeArguments(); - if (arguments[0].equals(dataType)) { - foundDataType = true; - } else { - acceptance = arguments[0].getTypeName(); + if (genericInterface instanceof ParameterizedType) { + ParameterizedType parameterizedType = (ParameterizedType) genericInterface; + if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) { + Type[] arguments = parameterizedType.getActualTypeArguments(); + if (arguments[0].equals(dataType)) { + foundDataType = true; + } else { + acceptance = arguments[0].getTypeName(); + } + } + if (foundDataType) { + break; } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java index 16f000a..f1150a6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java @@ -27,7 +27,6 @@ import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.UnexpectedException; -import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic; import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity; import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; @@ -58,13 +57,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal @Setter @Column(columnName = DATASET, dataType = Column.ValueDataType.HISTOGRAM, storageOnly = true, defaultValue = 0) private DataTable dataset = new DataTable(30); - /** - * Service ID is required for sort query. - */ - @Setter - @Getter - @Column(columnName = InstanceTraffic.SERVICE_ID) - private String serviceId; @Override public void accept(final MeterEntity entity, final BucketedValues value) { @@ -76,7 +68,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal } this.entityId = entity.id(); - this.serviceId = entity.serviceId(); final long[] values = value.getValues(); for (int i = 0; i < values.length; i++) { @@ -110,7 +101,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal HistogramFunction metrics = (HistogramFunction) createNew(); metrics.setEntityId(getEntityId()); metrics.setTimeBucket(toTimeBucketInHour()); - metrics.setServiceId(getServiceId()); metrics.setDataset(getDataset()); return metrics; } @@ -120,7 +110,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal HistogramFunction metrics = (HistogramFunction) createNew(); metrics.setEntityId(getEntityId()); metrics.setTimeBucket(toTimeBucketInDay()); - metrics.setServiceId(getServiceId()); metrics.setDataset(getDataset()); return metrics; } @@ -135,7 +124,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal this.setTimeBucket(remoteData.getDataLongs(0)); this.setEntityId(remoteData.getDataStrings(0)); - this.setServiceId(remoteData.getDataStrings(1)); this.setDataset(new DataTable(remoteData.getDataObjectStrings(0))); } @@ -146,7 +134,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal remoteBuilder.addDataLongs(getTimeBucket()); remoteBuilder.addDataStrings(entityId); - remoteBuilder.addDataStrings(serviceId); remoteBuilder.addDataObjectStrings(dataset.toStorageData()); @@ -175,7 +162,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal }; metrics.setDataset(new DataTable((String) dbMap.get(DATASET))); metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue()); - metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID)); metrics.setEntityId((String) dbMap.get(ENTITY_ID)); return metrics; } @@ -185,7 +171,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal Map<String, Object> map = new HashMap<>(); map.put(DATASET, storageData.getDataset()); map.put(TIME_BUCKET, storageData.getTimeBucket()); - map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId()); map.put(ENTITY_ID, storageData.getEntityId()); return map; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java new file mode 100644 index 0000000..01d767b --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.meter.function; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity; +import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable; +import org.apache.skywalking.oap.server.core.analysis.metrics.IntList; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.analysis.metrics.MultiIntValuesHolder; +import org.apache.skywalking.oap.server.core.analysis.metrics.PercentileMetrics; +import org.apache.skywalking.oap.server.core.query.type.Bucket; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; + +/** + * PercentileFunction is the implementation of {@link PercentileMetrics} in the meter system. The major difference is + * the PercentileFunction accepts the {@link PercentileArgument} as input rather than every single request. + */ +@MeterFunction(functionName = "percentile") +@Slf4j +@EqualsAndHashCode(of = { + "entityId", + "timeBucket" +}) +public abstract class PercentileFunction extends Metrics implements AcceptableValue<PercentileFunction.PercentileArgument>, MultiIntValuesHolder { + public static final String DATASET = "dataset"; + public static final String RANKS = "ranks"; + public static final String VALUE = "value"; + + @Setter + @Getter + @Column(columnName = ENTITY_ID) + private String entityId; + @Getter + @Setter + @Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true) + private DataTable percentileValues = new DataTable(10); + @Getter + @Setter + @Column(columnName = DATASET, storageOnly = true) + private DataTable dataset = new DataTable(30); + /** + * Rank + */ + @Getter + @Setter + @Column(columnName = RANKS, storageOnly = true) + private IntList ranks = new IntList(10); + + private boolean isCalculated = false; + + @Override + public void accept(final MeterEntity entity, final PercentileArgument value) { + if (dataset.size() > 0) { + if (!value.getBucketedValues().isCompatible(dataset)) { + throw new IllegalArgumentException( + "Incompatible BucketedValues [" + value + "] for current PercentileFunction[" + dataset + "]"); + } + } + + for (final int rank : value.getRanks()) { + if (rank <= 0) { + throw new IllegalArgumentException("Illegal rank value " + rank + ", must be positive"); + } + } + + if (ranks.size() > 0) { + if (ranks.size() != value.getRanks().length) { + throw new IllegalArgumentException( + "Incompatible ranks size = [" + value.getRanks().length + "] for current PercentileFunction[" + ranks + .size() + "]"); + } else { + for (final int rank : value.getRanks()) { + if (!ranks.include(rank)) { + throw new IllegalArgumentException( + "Rank " + rank + " doesn't exist in the previous ranks " + ranks); + } + } + } + } else { + for (final int rank : value.getRanks()) { + ranks.add(rank); + } + } + + this.entityId = entity.id(); + + final long[] values = value.getBucketedValues().getValues(); + for (int i = 0; i < values.length; i++) { + final long bucket = value.getBucketedValues().getBuckets()[i]; + String bucketName = bucket == Integer.MIN_VALUE ? Bucket.INFINITE_NEGATIVE : String.valueOf(bucket); + final long bucketValue = values[i]; + dataset.valueAccumulation(bucketName, bucketValue); + } + + this.isCalculated = false; + } + + @Override + public void combine(final Metrics metrics) { + PercentileFunction percentile = (PercentileFunction) metrics; + + if (!dataset.keysEqual(percentile.getDataset())) { + log.warn("Incompatible input [{}}] for current HistogramFunction[{}], entity {}", + percentile, this, entityId + ); + return; + } + if (ranks.size() > 0) { + IntList ranksOfThat = percentile.getRanks(); + if (this.ranks.size() != ranks.size()) { + log.warn("Incompatible ranks size = [{}}] for current PercentileFunction[{}]", + ranks.size(), this.ranks.size() + ); + return; + } else { + if (!this.ranks.equals(percentile.getRanks())) { + log.warn("Rank {} doesn't exist in the previous ranks {}", percentile.getRanks(), ranks); + return; + } + } + } + + this.dataset.append(percentile.dataset); + + this.isCalculated = false; + } + + @Override + public void calculate() { + if (!isCalculated) { + long total = dataset.sumOfValues(); + + int[] roofs = new int[ranks.size()]; + for (int i = 0; i < ranks.size(); i++) { + roofs[i] = Math.round(total * ranks.get(i) * 1.0f / 100); + } + + int count = 0; + final List<String> sortedKeys = dataset.sortedKeys(Comparator.comparingInt(Integer::parseInt)); + + int loopIndex = 0; + + for (String key : sortedKeys) { + final Long value = dataset.get(key); + + count += value; + for (int rankIdx = loopIndex; rankIdx < roofs.length; rankIdx++) { + int roof = roofs[rankIdx]; + + if (count >= roof) { + percentileValues.put(String.valueOf(ranks.get(rankIdx)), Long.parseLong(key)); + loopIndex++; + } else { + break; + } + } + } + } + } + + @Override + public Metrics toHour() { + PercentileFunction metrics = (PercentileFunction) createNew(); + metrics.setEntityId(getEntityId()); + metrics.setTimeBucket(toTimeBucketInHour()); + metrics.setDataset(getDataset()); + metrics.setRanks(getRanks()); + metrics.setPercentileValues(getPercentileValues()); + return metrics; + } + + @Override + public Metrics toDay() { + PercentileFunction metrics = (PercentileFunction) createNew(); + metrics.setEntityId(getEntityId()); + metrics.setTimeBucket(toTimeBucketInDay()); + metrics.setDataset(getDataset()); + metrics.setRanks(getRanks()); + metrics.setPercentileValues(getPercentileValues()); + return metrics; + } + + @Override + public int[] getValues() { + return percentileValues.sortedValues(Comparator.comparingInt(Integer::parseInt)) + .stream() + .flatMapToInt(l -> IntStream.of(l.intValue())) + .toArray(); + } + + @Override + public int remoteHashCode() { + return entityId.hashCode(); + } + + @Override + public void deserialize(final RemoteData remoteData) { + this.setTimeBucket(remoteData.getDataLongs(0)); + + this.setEntityId(remoteData.getDataStrings(0)); + + this.setDataset(new DataTable(remoteData.getDataObjectStrings(0))); + this.setRanks(new IntList(remoteData.getDataObjectStrings(1))); + this.setPercentileValues(new DataTable(remoteData.getDataObjectStrings(2))); + } + + @Override + public RemoteData.Builder serialize() { + RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); + remoteBuilder.addDataLongs(getTimeBucket()); + + remoteBuilder.addDataStrings(entityId); + + remoteBuilder.addDataObjectStrings(dataset.toStorageData()); + remoteBuilder.addDataObjectStrings(ranks.toStorageData()); + remoteBuilder.addDataObjectStrings(percentileValues.toStorageData()); + + return remoteBuilder; + } + + @Override + public String id() { + return getTimeBucket() + Const.ID_CONNECTOR + entityId; + } + + @Override + public Class<? extends StorageBuilder> builder() { + return PercentileFunctionBuilder.class; + } + + @RequiredArgsConstructor + @Getter + public static class PercentileArgument { + private final BucketedValues bucketedValues; + private final int[] ranks; + } + + public static class PercentileFunctionBuilder implements StorageBuilder<PercentileFunction> { + + @Override + public PercentileFunction map2Data(final Map<String, Object> dbMap) { + PercentileFunction metrics = new PercentileFunction() { + @Override + public AcceptableValue<PercentileArgument> createNew() { + throw new UnexpectedException("createNew should not be called"); + } + }; + metrics.setDataset(new DataTable((String) dbMap.get(DATASET))); + metrics.setRanks(new IntList((String) dbMap.get(RANKS))); + metrics.setPercentileValues(new DataTable((String) dbMap.get(VALUE))); + metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue()); + metrics.setEntityId((String) dbMap.get(ENTITY_ID)); + return metrics; + } + + @Override + public Map<String, Object> data2Map(final PercentileFunction storageData) { + Map<String, Object> map = new HashMap<>(); + map.put(DATASET, storageData.getDataset()); + map.put(RANKS, storageData.getRanks()); + map.put(VALUE, storageData.getPercentileValues()); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + map.put(ENTITY_ID, storageData.getEntityId()); + return map; + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java new file mode 100644 index 0000000..b5bd9ef --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.metrics; + +import java.util.ArrayList; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject; + +/** + * IntList is a serializable array list carrying int values. + */ +@ToString +@EqualsAndHashCode +public class IntList implements StorageDataComplexObject<IntList> { + private List<Integer> data; + + public IntList(int initialSize) { + this.data = new ArrayList(initialSize); + } + + public IntList(String valueString) { + toObject(valueString); + } + + public int size() { + return data.size(); + } + + public boolean include(int value) { + return data.contains(value); + } + + @Override + public String toStorageData() { + StringBuilder builder = new StringBuilder(); + + this.data.forEach(element -> { + if (builder.length() != 0) { + // For the first element. + builder.append(Const.ARRAY_SPLIT); + } + builder.append(element); + }); + return builder.toString(); + } + + @Override + public void toObject(final String data) { + String[] elements = data.split(Const.ARRAY_PARSER_SPLIT); + this.data = new ArrayList<>(elements.length); + for (String element : elements) { + this.data.add(Integer.parseInt(element)); + } + } + + @Override + public void copyFrom(final IntList source) { + this.data.addAll(source.data); + } + + public void add(final int rank) { + this.data.add(rank); + } + + public int get(final int idx) { + return this.data.get(idx); + } +} diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunctionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunctionTest.java new file mode 100644 index 0000000..6409d7b --- /dev/null +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunctionTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.meter.function; + +import java.util.Map; +import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity; +import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable; +import org.apache.skywalking.oap.server.core.analysis.metrics.IntList; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.junit.Assert; +import org.junit.Test; + +public class PercentileFunctionTest { + private static final int[] BUCKETS = new int[] { + 0, + 50, + 100, + 250 + }; + + private static final int[] BUCKETS_2ND = new int[] { + 0, + 51, + 100, + 250 + }; + + private static final int[] RANKS = new int[] { + 50, + 90 + }; + + @Test + public void testFunction() { + PercentileFunctionInst inst = new PercentileFunctionInst(); + inst.accept( + MeterEntity.newService("service-test"), + new PercentileFunction.PercentileArgument( + new BucketedValues( + BUCKETS, + new long[] { + 10, + 20, + 30, + 40 + } + ), + RANKS + ) + ); + + inst.accept( + MeterEntity.newService("service-test"), + new PercentileFunction.PercentileArgument( + new BucketedValues( + BUCKETS, + new long[] { + 10, + 20, + 30, + 40 + } + ), + RANKS + ) + ); + + inst.calculate(); + final int[] values = inst.getValues(); + /** + * Expected percentile dataset + * <pre> + * 0 , 20 + * 50 , 40 + * 100, 60 <- P50 + * 250, 80 <- P90 + * </pre> + */ + Assert.assertArrayEquals(new int[] { + 100, + 250 + }, values); + } + + @Test(expected = IllegalArgumentException.class) + public void testIncompatible() { + PercentileFunctionInst inst = new PercentileFunctionInst(); + inst.accept( + MeterEntity.newService("service-test"), + new PercentileFunction.PercentileArgument( + new BucketedValues( + BUCKETS, + new long[] { + 10, + 20, + 30, + 40 + } + ), + RANKS + ) + ); + + inst.accept( + MeterEntity.newService("service-test"), + new PercentileFunction.PercentileArgument( + new BucketedValues( + BUCKETS_2ND, + new long[] { + 10, + 20, + 30, + 40 + } + ), + RANKS + ) + ); + } + + @Test + public void testSerialization() { + PercentileFunctionInst inst = new PercentileFunctionInst(); + inst.accept( + MeterEntity.newService("service-test"), + new PercentileFunction.PercentileArgument( + new BucketedValues( + BUCKETS, + new long[] { + 10, + 20, + 30, + 40 + } + ), + RANKS + ) + ); + + PercentileFunctionInst inst2 = new PercentileFunctionInst(); + inst2.deserialize(inst.serialize().build()); + + Assert.assertEquals(inst, inst2); + // HistogramFunction equal doesn't include dataset. + Assert.assertEquals(inst.getDataset(), inst2.getDataset()); + Assert.assertEquals(inst.getRanks(), inst2.getRanks()); + Assert.assertEquals(0, inst2.getPercentileValues().size()); + } + + @Test + public void testBuilder() throws IllegalAccessException, InstantiationException { + PercentileFunctionInst inst = new PercentileFunctionInst(); + inst.accept( + MeterEntity.newService("service-test"), + new PercentileFunction.PercentileArgument( + new BucketedValues( + BUCKETS, + new long[] { + 10, + 20, + 30, + 40 + } + ), + RANKS + ) + ); + inst.calculate(); + + final StorageBuilder storageBuilder = inst.builder().newInstance(); + + // Simulate the storage layer do, convert the datatable to string. + final Map map = storageBuilder.data2Map(inst); + map.put(PercentileFunction.DATASET, ((DataTable) map.get(PercentileFunction.DATASET)).toStorageData()); + map.put(PercentileFunction.VALUE, ((DataTable) map.get(PercentileFunction.VALUE)).toStorageData()); + map.put(PercentileFunction.RANKS, ((IntList) map.get(PercentileFunction.RANKS)).toStorageData()); + + final PercentileFunction inst2 = (PercentileFunction) storageBuilder.map2Data(map); + Assert.assertEquals(inst, inst2); + // HistogramFunction equal doesn't include dataset. + Assert.assertEquals(inst.getDataset(), inst2.getDataset()); + Assert.assertEquals(inst.getPercentileValues(), inst2.getPercentileValues()); + Assert.assertEquals(inst.getRanks(), inst2.getRanks()); + } + + private static class PercentileFunctionInst extends PercentileFunction { + @Override + public AcceptableValue<PercentileArgument> createNew() { + return new PercentileFunctionInst(); + } + } +} diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java index 9b5efcd..0e2a00f 100644 --- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java +++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java @@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem; import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType; import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue; import org.apache.skywalking.oap.server.core.analysis.meter.function.BucketedValues; +import org.apache.skywalking.oap.server.core.analysis.meter.function.PercentileFunction; import org.apache.skywalking.oap.server.fetcher.prometheus.module.PrometheusFetcherModule; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; @@ -64,6 +65,10 @@ public class PrometheusFetcherProvider extends ModuleProvider { final MeterSystem meterSystem = MeterSystem.meterSystem(getManager()); meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class); meterSystem.create("test_histogram_metrics", "histogram", ScopeType.SERVICE, BucketedValues.class); + meterSystem.create( + "test_percentile_metrics", "percentile", ScopeType.SERVICE, + PercentileFunction.PercentileArgument.class + ); } } @@ -109,6 +114,38 @@ public class PrometheusFetcherProvider extends ModuleProvider { } )); service.doStreamingCalculation(histogramMetrics); + + // Percentile Example + final AcceptableValue<PercentileFunction.PercentileArgument> testPercentileMetrics = service.buildMetrics( + "test_percentile_metrics", PercentileFunction.PercentileArgument.class); + testPercentileMetrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis())); + testPercentileMetrics.accept( + MeterEntity.newService("service-test"), + new PercentileFunction.PercentileArgument( + new BucketedValues( + // Buckets + new int[] { + 0, + 51, + 100, + 250 + }, + // Values + new long[] { + 10, + 20, + 30, + 40 + } + ), + // Ranks + new int[] { + 50, + 90 + } + ) + ); + service.doStreamingCalculation(testPercentileMetrics); } }, 2, 2, TimeUnit.SECONDS); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java index 7eefe65..5509602 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java @@ -22,6 +22,7 @@ import com.google.gson.JsonObject; import org.apache.skywalking.oap.server.core.analysis.NodeType; import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable; import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping; +import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject; public class ColumnTypeEsMapping implements DataTypeMapping { @@ -35,7 +36,7 @@ public class ColumnTypeEsMapping implements DataTypeMapping { return "double"; } else if (String.class.equals(type)) { return "keyword"; - } else if (DataTable.class.equals(type)) { + } else if (StorageDataComplexObject.class.isAssignableFrom(type)) { return "text"; } else if (byte[].class.equals(type)) { return "binary"; diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java index 4d10bf4..53e5266 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java @@ -23,12 +23,12 @@ import java.sql.Connection; import java.sql.SQLException; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.analysis.NodeType; -import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.model.ColumnName; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller; +import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; @@ -98,7 +98,7 @@ public class H2TableInstaller extends ModelInstaller { return "DOUBLE"; } else if (String.class.equals(type)) { return "VARCHAR(" + column.getLength() + ")"; - } else if (DataTable.class.equals(type)) { + } else if (StorageDataComplexObject.class.isAssignableFrom(type)) { return "VARCHAR(20000)"; } else if (byte[].class.equals(type)) { return "MEDIUMTEXT"; diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java index 839fefb..89dc19c 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java @@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.model.ExtraQueryIndex; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; +import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; @@ -105,7 +106,7 @@ public class MySQLTableInstaller extends H2TableInstaller { @Override protected String getColumnType(final ModelColumn column) { - if (DataTable.class.equals(column.getType())) { + if (StorageDataComplexObject.class.isAssignableFrom(column.getType())) { return "MEDIUMTEXT"; } return super.getColumnType(column);
