This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch percentile
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit cbcd8a8c23a3544aac3341affb641f2988951382
Author: Wu Sheng <[email protected]>
AuthorDate: Tue Apr 28 11:01:33 2020 +0800

    Support percentile function in the meter system.
---
 .../skywalking/oal/rt/parser/AnalysisResult.java   |   2 +-
 .../oal/rt/parser/PersistenceColumns.java          |  12 +-
 .../skywalking/oal/rt/parser/PersistenceField.java |   4 +-
 .../code-templates/metrics/deserialize.ftl         |   2 +-
 .../server/core/analysis/meter/MeterSystem.java    |  19 +-
 .../analysis/meter/function/HistogramFunction.java |  15 --
 .../meter/function/PercentileFunction.java         | 297 +++++++++++++++++++++
 .../oap/server/core/analysis/metrics/IntList.java  |  87 ++++++
 .../meter/function/PercentileFunctionTest.java     | 208 +++++++++++++++
 .../provider/PrometheusFetcherProvider.java        |  37 +++
 .../elasticsearch/base/ColumnTypeEsMapping.java    |   3 +-
 .../plugin/jdbc/h2/dao/H2TableInstaller.java       |   4 +-
 .../plugin/jdbc/mysql/MySQLTableInstaller.java     |   3 +-
 13 files changed, 658 insertions(+), 35 deletions(-)

diff --git 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java
 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java
index c0cbe70..31a975b 100644
--- 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java
+++ 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java
@@ -139,7 +139,7 @@ public class AnalysisResult {
             } else if (columnType.equals(long.class)) {
                 serializeFields.addLongField(column.getFieldName());
             } else if 
(StorageDataComplexObject.class.isAssignableFrom(columnType)) {
-                serializeFields.addObjectField(column.getFieldName());
+                serializeFields.addObjectField(column.getFieldName(), 
columnType.getName());
             } else {
                 throw new IllegalStateException(
                     "Unexpected field type [" + columnType.getSimpleName() + 
"] of persistence column [" + column
diff --git 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java
 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java
index 0fab3aa..195e3f8 100644
--- 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java
+++ 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java
@@ -29,23 +29,23 @@ public class PersistenceColumns {
     private List<PersistenceField> objectFields = new LinkedList<>();
 
     public void addStringField(String fieldName) {
-        stringFields.add(new PersistenceField(fieldName));
+        stringFields.add(new PersistenceField(fieldName, "String"));
     }
 
     public void addLongField(String fieldName) {
-        longFields.add(new PersistenceField(fieldName));
+        longFields.add(new PersistenceField(fieldName, "long"));
     }
 
     public void addDoubleField(String fieldName) {
-        doubleFields.add(new PersistenceField(fieldName));
+        doubleFields.add(new PersistenceField(fieldName, "double"));
     }
 
     public void addIntField(String fieldName) {
-        intFields.add(new PersistenceField(fieldName));
+        intFields.add(new PersistenceField(fieldName, "int"));
     }
 
-    public void addObjectField(String fieldName) {
-        objectFields.add(new PersistenceField(fieldName));
+    public void addObjectField(String fieldName, String fieldType) {
+        objectFields.add(new PersistenceField(fieldName, fieldType));
     }
 
     public List<PersistenceField> getStringFields() {
diff --git 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java
 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java
index 015cebb..622e7e8 100644
--- 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java
+++ 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java
@@ -29,10 +29,12 @@ public class PersistenceField {
     private String fieldName;
     private String setter;
     private String getter;
+    private String fieldType;
 
-    public PersistenceField(String fieldName) {
+    public PersistenceField(String fieldName, String fieldType) {
         this.fieldName = fieldName;
         this.setter = ClassMethodUtil.toSetMethod(fieldName);
         this.getter = ClassMethodUtil.toGetMethod(fieldName);
+        this.fieldType = fieldType;
     }
 }
diff --git 
a/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl 
b/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl
index 2484151..455634f 100644
--- 
a/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl
+++ 
b/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl
@@ -16,7 +16,7 @@ public void 
deserialize(org.apache.skywalking.oap.server.core.remote.grpc.proto.
 </#list>
 
 <#list serializeFields.objectFields as field>
-    ${field.setter}(new 
org.apache.skywalking.oap.server.core.analysis.metrics.DataTable(remoteData.getDataObjectStrings(${field?index})));
+    ${field.setter}(new 
${field.fieldType}(remoteData.getDataObjectStrings(${field?index})));
 </#list>
 
 }
\ No newline at end of file
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
index 2d5173b..6ba9a47 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
@@ -159,13 +159,18 @@ public class MeterSystem implements Service {
             boolean foundDataType = false;
             String acceptance = null;
             for (final Type genericInterface : 
meterFunction.getGenericInterfaces()) {
-                ParameterizedType parameterizedType = (ParameterizedType) 
genericInterface;
-                if 
(parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName()))
 {
-                    Type[] arguments = 
parameterizedType.getActualTypeArguments();
-                    if (arguments[0].equals(dataType)) {
-                        foundDataType = true;
-                    } else {
-                        acceptance = arguments[0].getTypeName();
+                if (genericInterface instanceof ParameterizedType) {
+                    ParameterizedType parameterizedType = (ParameterizedType) 
genericInterface;
+                    if 
(parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName()))
 {
+                        Type[] arguments = 
parameterizedType.getActualTypeArguments();
+                        if (arguments[0].equals(dataType)) {
+                            foundDataType = true;
+                        } else {
+                            acceptance = arguments[0].getTypeName();
+                        }
+                    }
+                    if (foundDataType) {
+                        break;
                     }
                 }
             }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
index 16f000a..f1150a6 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
@@ -27,7 +27,6 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import 
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -58,13 +57,6 @@ public abstract class HistogramFunction extends Metrics 
implements AcceptableVal
     @Setter
     @Column(columnName = DATASET, dataType = Column.ValueDataType.HISTOGRAM, 
storageOnly = true, defaultValue = 0)
     private DataTable dataset = new DataTable(30);
-    /**
-     * Service ID is required for sort query.
-     */
-    @Setter
-    @Getter
-    @Column(columnName = InstanceTraffic.SERVICE_ID)
-    private String serviceId;
 
     @Override
     public void accept(final MeterEntity entity, final BucketedValues value) {
@@ -76,7 +68,6 @@ public abstract class HistogramFunction extends Metrics 
implements AcceptableVal
         }
 
         this.entityId = entity.id();
-        this.serviceId = entity.serviceId();
 
         final long[] values = value.getValues();
         for (int i = 0; i < values.length; i++) {
@@ -110,7 +101,6 @@ public abstract class HistogramFunction extends Metrics 
implements AcceptableVal
         HistogramFunction metrics = (HistogramFunction) createNew();
         metrics.setEntityId(getEntityId());
         metrics.setTimeBucket(toTimeBucketInHour());
-        metrics.setServiceId(getServiceId());
         metrics.setDataset(getDataset());
         return metrics;
     }
@@ -120,7 +110,6 @@ public abstract class HistogramFunction extends Metrics 
implements AcceptableVal
         HistogramFunction metrics = (HistogramFunction) createNew();
         metrics.setEntityId(getEntityId());
         metrics.setTimeBucket(toTimeBucketInDay());
-        metrics.setServiceId(getServiceId());
         metrics.setDataset(getDataset());
         return metrics;
     }
@@ -135,7 +124,6 @@ public abstract class HistogramFunction extends Metrics 
implements AcceptableVal
         this.setTimeBucket(remoteData.getDataLongs(0));
 
         this.setEntityId(remoteData.getDataStrings(0));
-        this.setServiceId(remoteData.getDataStrings(1));
 
         this.setDataset(new DataTable(remoteData.getDataObjectStrings(0)));
     }
@@ -146,7 +134,6 @@ public abstract class HistogramFunction extends Metrics 
implements AcceptableVal
         remoteBuilder.addDataLongs(getTimeBucket());
 
         remoteBuilder.addDataStrings(entityId);
-        remoteBuilder.addDataStrings(serviceId);
 
         remoteBuilder.addDataObjectStrings(dataset.toStorageData());
 
@@ -175,7 +162,6 @@ public abstract class HistogramFunction extends Metrics 
implements AcceptableVal
             };
             metrics.setDataset(new DataTable((String) dbMap.get(DATASET)));
             metrics.setTimeBucket(((Number) 
dbMap.get(TIME_BUCKET)).longValue());
-            metrics.setServiceId((String) 
dbMap.get(InstanceTraffic.SERVICE_ID));
             metrics.setEntityId((String) dbMap.get(ENTITY_ID));
             return metrics;
         }
@@ -185,7 +171,6 @@ public abstract class HistogramFunction extends Metrics 
implements AcceptableVal
             Map<String, Object> map = new HashMap<>();
             map.put(DATASET, storageData.getDataset());
             map.put(TIME_BUCKET, storageData.getTimeBucket());
-            map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
             map.put(ENTITY_ID, storageData.getEntityId());
             return map;
         }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
new file mode 100644
index 0000000..01d767b
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import 
org.apache.skywalking.oap.server.core.analysis.metrics.MultiIntValuesHolder;
+import 
org.apache.skywalking.oap.server.core.analysis.metrics.PercentileMetrics;
+import org.apache.skywalking.oap.server.core.query.type.Bucket;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+/**
+ * PercentileFunction is the implementation of {@link PercentileMetrics} in 
the meter system. The major difference is
+ * the PercentileFunction accepts the {@link PercentileArgument} as input 
rather than every single request.
+ */
+@MeterFunction(functionName = "percentile")
+@Slf4j
+@EqualsAndHashCode(of = {
+    "entityId",
+    "timeBucket"
+})
+public abstract class PercentileFunction extends Metrics implements 
AcceptableValue<PercentileFunction.PercentileArgument>, MultiIntValuesHolder {
+    public static final String DATASET = "dataset";
+    public static final String RANKS = "ranks";
+    public static final String VALUE = "value";
+
+    @Setter
+    @Getter
+    @Column(columnName = ENTITY_ID)
+    private String entityId;
+    @Getter
+    @Setter
+    @Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, 
storageOnly = true)
+    private DataTable percentileValues = new DataTable(10);
+    @Getter
+    @Setter
+    @Column(columnName = DATASET, storageOnly = true)
+    private DataTable dataset = new DataTable(30);
+    /**
+     * Rank
+     */
+    @Getter
+    @Setter
+    @Column(columnName = RANKS, storageOnly = true)
+    private IntList ranks = new IntList(10);
+
+    private boolean isCalculated = false;
+
+    @Override
+    public void accept(final MeterEntity entity, final PercentileArgument 
value) {
+        if (dataset.size() > 0) {
+            if (!value.getBucketedValues().isCompatible(dataset)) {
+                throw new IllegalArgumentException(
+                    "Incompatible BucketedValues [" + value + "] for current 
PercentileFunction[" + dataset + "]");
+            }
+        }
+
+        for (final int rank : value.getRanks()) {
+            if (rank <= 0) {
+                throw new IllegalArgumentException("Illegal rank value " + 
rank + ", must be positive");
+            }
+        }
+
+        if (ranks.size() > 0) {
+            if (ranks.size() != value.getRanks().length) {
+                throw new IllegalArgumentException(
+                    "Incompatible ranks size = [" + value.getRanks().length + 
"] for current PercentileFunction[" + ranks
+                        .size() + "]");
+            } else {
+                for (final int rank : value.getRanks()) {
+                    if (!ranks.include(rank)) {
+                        throw new IllegalArgumentException(
+                            "Rank " + rank + " doesn't exist in the previous 
ranks " + ranks);
+                    }
+                }
+            }
+        } else {
+            for (final int rank : value.getRanks()) {
+                ranks.add(rank);
+            }
+        }
+
+        this.entityId = entity.id();
+
+        final long[] values = value.getBucketedValues().getValues();
+        for (int i = 0; i < values.length; i++) {
+            final long bucket = value.getBucketedValues().getBuckets()[i];
+            String bucketName = bucket == Integer.MIN_VALUE ? 
Bucket.INFINITE_NEGATIVE : String.valueOf(bucket);
+            final long bucketValue = values[i];
+            dataset.valueAccumulation(bucketName, bucketValue);
+        }
+
+        this.isCalculated = false;
+    }
+
+    @Override
+    public void combine(final Metrics metrics) {
+        PercentileFunction percentile = (PercentileFunction) metrics;
+
+        if (!dataset.keysEqual(percentile.getDataset())) {
+            log.warn("Incompatible input [{}}] for current 
HistogramFunction[{}], entity {}",
+                     percentile, this, entityId
+            );
+            return;
+        }
+        if (ranks.size() > 0) {
+            IntList ranksOfThat = percentile.getRanks();
+            if (this.ranks.size() != ranks.size()) {
+                log.warn("Incompatible ranks size = [{}}] for current 
PercentileFunction[{}]",
+                         ranks.size(), this.ranks.size()
+                );
+                return;
+            } else {
+                if (!this.ranks.equals(percentile.getRanks())) {
+                    log.warn("Rank {} doesn't exist in the previous ranks {}", 
percentile.getRanks(), ranks);
+                    return;
+                }
+            }
+        }
+
+        this.dataset.append(percentile.dataset);
+
+        this.isCalculated = false;
+    }
+
+    @Override
+    public void calculate() {
+        if (!isCalculated) {
+            long total = dataset.sumOfValues();
+
+            int[] roofs = new int[ranks.size()];
+            for (int i = 0; i < ranks.size(); i++) {
+                roofs[i] = Math.round(total * ranks.get(i) * 1.0f / 100);
+            }
+
+            int count = 0;
+            final List<String> sortedKeys = 
dataset.sortedKeys(Comparator.comparingInt(Integer::parseInt));
+
+            int loopIndex = 0;
+
+            for (String key : sortedKeys) {
+                final Long value = dataset.get(key);
+
+                count += value;
+                for (int rankIdx = loopIndex; rankIdx < roofs.length; 
rankIdx++) {
+                    int roof = roofs[rankIdx];
+
+                    if (count >= roof) {
+                        
percentileValues.put(String.valueOf(ranks.get(rankIdx)), Long.parseLong(key));
+                        loopIndex++;
+                    } else {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public Metrics toHour() {
+        PercentileFunction metrics = (PercentileFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInHour());
+        metrics.setDataset(getDataset());
+        metrics.setRanks(getRanks());
+        metrics.setPercentileValues(getPercentileValues());
+        return metrics;
+    }
+
+    @Override
+    public Metrics toDay() {
+        PercentileFunction metrics = (PercentileFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInDay());
+        metrics.setDataset(getDataset());
+        metrics.setRanks(getRanks());
+        metrics.setPercentileValues(getPercentileValues());
+        return metrics;
+    }
+
+    @Override
+    public int[] getValues() {
+        return 
percentileValues.sortedValues(Comparator.comparingInt(Integer::parseInt))
+                               .stream()
+                               .flatMapToInt(l -> IntStream.of(l.intValue()))
+                               .toArray();
+    }
+
+    @Override
+    public int remoteHashCode() {
+        return entityId.hashCode();
+    }
+
+    @Override
+    public void deserialize(final RemoteData remoteData) {
+        this.setTimeBucket(remoteData.getDataLongs(0));
+
+        this.setEntityId(remoteData.getDataStrings(0));
+
+        this.setDataset(new DataTable(remoteData.getDataObjectStrings(0)));
+        this.setRanks(new IntList(remoteData.getDataObjectStrings(1)));
+        this.setPercentileValues(new 
DataTable(remoteData.getDataObjectStrings(2)));
+    }
+
+    @Override
+    public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+        remoteBuilder.addDataLongs(getTimeBucket());
+
+        remoteBuilder.addDataStrings(entityId);
+
+        remoteBuilder.addDataObjectStrings(dataset.toStorageData());
+        remoteBuilder.addDataObjectStrings(ranks.toStorageData());
+        remoteBuilder.addDataObjectStrings(percentileValues.toStorageData());
+
+        return remoteBuilder;
+    }
+
+    @Override
+    public String id() {
+        return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+    }
+
+    @Override
+    public Class<? extends StorageBuilder> builder() {
+        return PercentileFunctionBuilder.class;
+    }
+
+    @RequiredArgsConstructor
+    @Getter
+    public static class PercentileArgument {
+        private final BucketedValues bucketedValues;
+        private final int[] ranks;
+    }
+
+    public static class PercentileFunctionBuilder implements 
StorageBuilder<PercentileFunction> {
+
+        @Override
+        public PercentileFunction map2Data(final Map<String, Object> dbMap) {
+            PercentileFunction metrics = new PercentileFunction() {
+                @Override
+                public AcceptableValue<PercentileArgument> createNew() {
+                    throw new UnexpectedException("createNew should not be 
called");
+                }
+            };
+            metrics.setDataset(new DataTable((String) dbMap.get(DATASET)));
+            metrics.setRanks(new IntList((String) dbMap.get(RANKS)));
+            metrics.setPercentileValues(new DataTable((String) 
dbMap.get(VALUE)));
+            metrics.setTimeBucket(((Number) 
dbMap.get(TIME_BUCKET)).longValue());
+            metrics.setEntityId((String) dbMap.get(ENTITY_ID));
+            return metrics;
+        }
+
+        @Override
+        public Map<String, Object> data2Map(final PercentileFunction 
storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(DATASET, storageData.getDataset());
+            map.put(RANKS, storageData.getRanks());
+            map.put(VALUE, storageData.getPercentileValues());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            map.put(ENTITY_ID, storageData.getEntityId());
+            return map;
+        }
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
new file mode 100644
index 0000000..b5bd9ef
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.Const;
+import 
org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
+
+/**
+ * IntList is a serializable array list carrying int values.
+ */
+@ToString
+@EqualsAndHashCode
+public class IntList implements StorageDataComplexObject<IntList> {
+    private List<Integer> data;
+
+    public IntList(int initialSize) {
+        this.data = new ArrayList(initialSize);
+    }
+
+    public IntList(String valueString) {
+        toObject(valueString);
+    }
+
+    public int size() {
+        return data.size();
+    }
+
+    public boolean include(int value) {
+        return data.contains(value);
+    }
+
+    @Override
+    public String toStorageData() {
+        StringBuilder builder = new StringBuilder();
+
+        this.data.forEach(element -> {
+            if (builder.length() != 0) {
+                // For the first element.
+                builder.append(Const.ARRAY_SPLIT);
+            }
+            builder.append(element);
+        });
+        return builder.toString();
+    }
+
+    @Override
+    public void toObject(final String data) {
+        String[] elements = data.split(Const.ARRAY_PARSER_SPLIT);
+        this.data = new ArrayList<>(elements.length);
+        for (String element : elements) {
+            this.data.add(Integer.parseInt(element));
+        }
+    }
+
+    @Override
+    public void copyFrom(final IntList source) {
+        this.data.addAll(source.data);
+    }
+
+    public void add(final int rank) {
+        this.data.add(rank);
+    }
+
+    public int get(final int idx) {
+        return this.data.get(idx);
+    }
+}
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunctionTest.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunctionTest.java
new file mode 100644
index 0000000..6409d7b
--- /dev/null
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunctionTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PercentileFunctionTest {
+    private static final int[] BUCKETS = new int[] {
+        0,
+        50,
+        100,
+        250
+    };
+
+    private static final int[] BUCKETS_2ND = new int[] {
+        0,
+        51,
+        100,
+        250
+    };
+
+    private static final int[] RANKS = new int[] {
+        50,
+        90
+    };
+
+    @Test
+    public void testFunction() {
+        PercentileFunctionInst inst = new PercentileFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+
+        inst.calculate();
+        final int[] values = inst.getValues();
+        /**
+         * Expected percentile dataset
+         * <pre>
+         *     0  , 20
+         *     50 , 40
+         *     100, 60 <- P50
+         *     250, 80 <- P90
+         * </pre>
+         */
+        Assert.assertArrayEquals(new int[] {
+            100,
+            250
+        }, values);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIncompatible() {
+        PercentileFunctionInst inst = new PercentileFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS_2ND,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+    }
+
+    @Test
+    public void testSerialization() {
+        PercentileFunctionInst inst = new PercentileFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+
+        PercentileFunctionInst inst2 = new PercentileFunctionInst();
+        inst2.deserialize(inst.serialize().build());
+
+        Assert.assertEquals(inst, inst2);
+        // HistogramFunction equal doesn't include dataset.
+        Assert.assertEquals(inst.getDataset(), inst2.getDataset());
+        Assert.assertEquals(inst.getRanks(), inst2.getRanks());
+        Assert.assertEquals(0, inst2.getPercentileValues().size());
+    }
+
+    @Test
+    public void testBuilder() throws IllegalAccessException, 
InstantiationException {
+        PercentileFunctionInst inst = new PercentileFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+        inst.calculate();
+
+        final StorageBuilder storageBuilder = inst.builder().newInstance();
+
+        // Simulate the storage layer do, convert the datatable to string.
+        final Map map = storageBuilder.data2Map(inst);
+        map.put(PercentileFunction.DATASET, ((DataTable) 
map.get(PercentileFunction.DATASET)).toStorageData());
+        map.put(PercentileFunction.VALUE, ((DataTable) 
map.get(PercentileFunction.VALUE)).toStorageData());
+        map.put(PercentileFunction.RANKS, ((IntList) 
map.get(PercentileFunction.RANKS)).toStorageData());
+
+        final PercentileFunction inst2 = (PercentileFunction) 
storageBuilder.map2Data(map);
+        Assert.assertEquals(inst, inst2);
+        // HistogramFunction equal doesn't include dataset.
+        Assert.assertEquals(inst.getDataset(), inst2.getDataset());
+        Assert.assertEquals(inst.getPercentileValues(), 
inst2.getPercentileValues());
+        Assert.assertEquals(inst.getRanks(), inst2.getRanks());
+    }
+
+    private static class PercentileFunctionInst extends PercentileFunction {
+        @Override
+        public AcceptableValue<PercentileArgument> createNew() {
+            return new PercentileFunctionInst();
+        }
+    }
+}
diff --git 
a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
 
b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
index 9b5efcd..0e2a00f 100644
--- 
a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
+++ 
b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
@@ -27,6 +27,7 @@ import 
org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
 import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
 import 
org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
 import 
org.apache.skywalking.oap.server.core.analysis.meter.function.BucketedValues;
+import 
org.apache.skywalking.oap.server.core.analysis.meter.function.PercentileFunction;
 import 
org.apache.skywalking.oap.server.fetcher.prometheus.module.PrometheusFetcherModule;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
@@ -64,6 +65,10 @@ public class PrometheusFetcherProvider extends 
ModuleProvider {
             final MeterSystem meterSystem = 
MeterSystem.meterSystem(getManager());
             meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, 
Long.class);
             meterSystem.create("test_histogram_metrics", "histogram", 
ScopeType.SERVICE, BucketedValues.class);
+            meterSystem.create(
+                "test_percentile_metrics", "percentile", ScopeType.SERVICE,
+                PercentileFunction.PercentileArgument.class
+            );
         }
     }
 
@@ -109,6 +114,38 @@ public class PrometheusFetcherProvider extends 
ModuleProvider {
                         }
                     ));
                     service.doStreamingCalculation(histogramMetrics);
+
+                    // Percentile Example
+                    final 
AcceptableValue<PercentileFunction.PercentileArgument> testPercentileMetrics = 
service.buildMetrics(
+                        "test_percentile_metrics", 
PercentileFunction.PercentileArgument.class);
+                    
testPercentileMetrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+                    testPercentileMetrics.accept(
+                        MeterEntity.newService("service-test"),
+                        new PercentileFunction.PercentileArgument(
+                            new BucketedValues(
+                                // Buckets
+                                new int[] {
+                                    0,
+                                    51,
+                                    100,
+                                    250
+                                },
+                                // Values
+                                new long[] {
+                                    10,
+                                    20,
+                                    30,
+                                    40
+                                }
+                            ),
+                            // Ranks
+                            new int[] {
+                                50,
+                                90
+                            }
+                        )
+                    );
+                    service.doStreamingCalculation(testPercentileMetrics);
                 }
             }, 2, 2, TimeUnit.SECONDS);
         }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
index 7eefe65..5509602 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
 import org.apache.skywalking.oap.server.core.analysis.NodeType;
 import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping;
+import 
org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
 
 public class ColumnTypeEsMapping implements DataTypeMapping {
 
@@ -35,7 +36,7 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
             return "double";
         } else if (String.class.equals(type)) {
             return "keyword";
-        } else if (DataTable.class.equals(type)) {
+        } else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
             return "text";
         } else if (byte[].class.equals(type)) {
             return "binary";
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
index 4d10bf4..53e5266 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
@@ -23,12 +23,12 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.analysis.NodeType;
-import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import 
org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
 import org.apache.skywalking.oap.server.library.client.Client;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
@@ -98,7 +98,7 @@ public class H2TableInstaller extends ModelInstaller {
             return "DOUBLE";
         } else if (String.class.equals(type)) {
             return "VARCHAR(" + column.getLength() + ")";
-        } else if (DataTable.class.equals(type)) {
+        } else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
             return "VARCHAR(20000)";
         } else if (byte[].class.equals(type)) {
             return "MEDIUMTEXT";
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
index 839fefb..89dc19c 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
@@ -27,6 +27,7 @@ import 
org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.model.ExtraQueryIndex;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
+import 
org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
 import org.apache.skywalking.oap.server.library.client.Client;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
@@ -105,7 +106,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
 
     @Override
     protected String getColumnType(final ModelColumn column) {
-        if (DataTable.class.equals(column.getType())) {
+        if (StorageDataComplexObject.class.isAssignableFrom(column.getType())) 
{
             return "MEDIUMTEXT";
         }
         return super.getColumnType(column);

Reply via email to