This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch comments in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 7e844fd91e3ff8e6fde73339c38d7281de9ab557 Author: Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Thu Feb 13 09:40:44 2020 +0800 Fill some comments for core. --- .../src/main/resources/official_analysis.oal | 2 +- .../oap/server/core/analysis/data/Window.java | 8 +++ .../oap/server/core/analysis/metrics/Metrics.java | 46 ++++++++++++++- .../oap/server/core/analysis/record/Record.java | 8 +++ .../analysis/worker/MetricsPersistentWorker.java | 12 ++-- .../core/analysis/worker/PersistenceWorker.java | 32 ++++++++++ .../oap/server/core/register/RegisterSource.java | 3 + .../oap/server/core/source/DefaultScopeDefine.java | 69 ++++++++++++++++++---- .../oap/server/core/source/ScopeDeclaration.java | 13 +++- .../oap/server/core/storage/StorageData.java | 6 ++ 10 files changed, 180 insertions(+), 19 deletions(-) diff --git a/oap-server/server-bootstrap/src/main/resources/official_analysis.oal b/oap-server/server-bootstrap/src/main/resources/official_analysis.oal index ad66bac..afee68b 100755 --- a/oap-server/server-bootstrap/src/main/resources/official_analysis.oal +++ b/oap-server/server-bootstrap/src/main/resources/official_analysis.oal @@ -95,7 +95,7 @@ envoy_heap_memory_max_used = from(EnvoyInstanceMetric.value).filter(metricName = envoy_total_connections_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.total_connections").maxDouble(); envoy_parent_connections_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.parent_connections").maxDouble(); -// Disable unnecessary hard core sources +// Disable unnecessary hard core stream, targeting @Stream#name ///////// // disable(segment); // disable(endpoint_relation_server_side); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java index 4236a1f..6aa3f6a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java @@ -20,6 +20,14 @@ package org.apache.skywalking.oap.server.core.analysis.data; import java.util.concurrent.atomic.AtomicInteger; +/** + * Data cache window. Window holds two data collections(A and B). They are switchable based on outside command. + * In any time, one collection is accepting the input data, and the other is immutable. + * + * This window makes sure there is not concurrency read-write situation. + * + * @param <DATA> + */ public abstract class Window<DATA> { private AtomicInteger windowSwitch = new AtomicInteger(0); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java index 065a98c..12dbdb3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java @@ -26,31 +26,71 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +/** + * Metrics represents the statistic data, which analysis by OAL script or hard code. It has the lifecycle controlled by + * TTL(time to live). + */ public abstract class Metrics extends StreamData implements StorageData { public static final String TIME_BUCKET = "time_bucket"; public static final String ENTITY_ID = "entity_id"; + /** + * Time attribute + */ @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket; + + /** + * Time in the cache, only work when MetricsPersistentWorker#enableDatabaseSession == true. + */ @Getter - @Setter private long survivalTime = 0L; - public abstract String id(); - + /** + * Merge the given metrics instance, these two must be the same metrics type. + * + * @param metrics to be merged + */ public abstract void combine(Metrics metrics); + /** + * Calculate the metrics final value when required. + */ public abstract void calculate(); + /** + * Downsampling the metrics to hour precision. + * + * @return the metrics in hour precision in the clone mode. + */ public abstract Metrics toHour(); + /** + * Downsampling the metrics to day precision. + * + * @return the metrics in day precision in the clone mode. + */ public abstract Metrics toDay(); + /** + * Downsampling the metrics to month precision. + * + * @return the metrics in month precision in the clone mode. + */ public abstract Metrics toMonth(); + /** + * Extend the {@link #survivalTime} + * + * @param value to extend + */ + public void extendSurvivalTime(long value) { + survivalTime += value; + } + public long toTimeBucketInHour() { if (isMinuteBucket()) { return timeBucket / 100; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java index b75e2ac..4cf97d2 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java @@ -20,13 +20,21 @@ package org.apache.skywalking.oap.server.core.analysis.record; import lombok.Getter; import lombok.Setter; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.storage.annotation.Column; +/** + * Record storage represents the entity have fully and manually entity definition by hard codes. Most of then are + * original log data or task records. These data needs to persistent without further analysis. + */ public abstract class Record implements StorageData { public static final String TIME_BUCKET = "time_bucket"; + /** + * Time attribute, all storage data is time sensitive, as same as {@link Metrics} + */ @Getter @Setter @Column(columnName = TIME_BUCKET) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 259695e..263cac4 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -57,8 +57,8 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat private final boolean enableDatabaseSession; MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, - AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker, - MetricsTransWorker transWorker, boolean enableDatabaseSession) { + AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker, + MetricsTransWorker transWorker, boolean enableDatabaseSession) { super(moduleDefineHolder); this.model = model; this.databaseSession = new HashMap<>(100); @@ -152,7 +152,10 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat } if (prepareRequests.size() > 0) { - logger.debug("prepare batch requests for model {}, took time: {}", model.getName(), System.currentTimeMillis() - start); + logger.debug( + "prepare batch requests for model {}, took time: {}", model.getName(), + System.currentTimeMillis() - start + ); } } @@ -207,7 +210,8 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat Iterator<Metrics> iterator = databaseSession.values().iterator(); while (iterator.hasNext()) { Metrics metrics = iterator.next(); - metrics.setSurvivalTime(tookTime + metrics.getSurvivalTime()); + metrics.extendSurvivalTime(tookTime); + // 70,000ms means more than one minute. if (metrics.getSurvivalTime() > 70000) { iterator.remove(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java index c46a857..156618d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java @@ -29,6 +29,13 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * PersistenceWorker take the responsibility to pushing data to the final storage. The target storage is based on the + * activate storage implementation. This worker controls the persistence flow. + * + * @param <INPUT> The type of worker input. All inputs will be merged and saved. + * @param <CACHE> Cache type to hold all input. + */ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends Window<INPUT>> extends AbstractWorker<INPUT> { private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class); @@ -37,16 +44,34 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends super(moduleDefineHolder); } + /** + * Accept the input, and push the data into the cache. + */ void onWork(INPUT input) { cacheData(input); } + /** + * Cache data based on different strategies. See the implementations for more details. + */ public abstract void cacheData(INPUT input); public abstract CACHE getCache(); + /** + * The persistence process is driven by the {@link org.apache.skywalking.oap.server.core.storage.PersistenceTimer}. + * This is a notification method for the worker when every round finished. + * + * @param tookTime The time costs in this round. + */ public abstract void endOfRound(long tookTime); + /** + * For every cache implementation(see {@link Window}), there are two dataset, switch them when one persistence round + * is beginning, in order to make cached data immutable. + * + * @return true if switch successfully. + */ public boolean flushAndSwitch() { boolean isSwitch; try { @@ -59,6 +84,13 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends return isSwitch; } + /** + * Prepare the batch persistence, transfer all prepared data to the executable data format based on the storage + * implementations. + * + * @param lastCollection the source of transformation, they are in memory object format. + * @param prepareRequests data in the formats for the final persistence operations. + */ public abstract void prepareBatch(Collection<INPUT> lastCollection, List<PrepareRequest> prepareRequests); public final void buildBatchRequests(List<PrepareRequest> prepareRequests) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java index 7b31d11..ea3947e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java @@ -24,6 +24,9 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.storage.annotation.Column; +/** + * RegisterSource represents the metadata entity. + */ public abstract class RegisterSource extends StreamData implements StorageData { public static final String SEQUENCE = "sequence"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java index 214f660..cab4cc6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java @@ -78,6 +78,9 @@ public class DefaultScopeDefine { private static final Map<Integer, Boolean> SERVICE_INSTANCE_CATALOG = new HashMap<>(); private static final Map<Integer, Boolean> ENDPOINT_CATALOG = new HashMap<>(); + /** + * Annotation scan listener + */ public static class Listener implements AnnotationListener { @Override public Class<? extends Annotation> annotation() { @@ -93,16 +96,24 @@ public class DefaultScopeDefine { } } - public static final void addNewScope(ScopeDeclaration declaration, Class originalClass) { + /** + * Add a new scope based on the scan result + * + * @param declaration includes the definition. + * @param originalClass represents the class having the {@link ScopeDeclaration} annotation + */ + private static final void addNewScope(ScopeDeclaration declaration, Class originalClass) { int id = declaration.id(); if (ID_2_NAME.containsKey(id)) { - throw new UnexpectedException("ScopeDeclaration id=" + id + " at " + originalClass.getName() + " has conflict with another named " + ID_2_NAME - .get(id)); + throw new UnexpectedException( + "ScopeDeclaration id=" + id + " at " + originalClass.getName() + " has conflict with another named " + ID_2_NAME + .get(id)); } String name = declaration.name(); if (NAME_2_ID.containsKey(name)) { - throw new UnexpectedException("ScopeDeclaration fieldName=" + name + " at " + originalClass.getName() + " has conflict with another id= " + NAME_2_ID - .get(name)); + throw new UnexpectedException( + "ScopeDeclaration fieldName=" + name + " at " + originalClass.getName() + " has conflict with another id= " + NAME_2_ID + .get(name)); } ID_2_NAME.put(id, name); NAME_2_ID.put(name, id); @@ -112,16 +123,21 @@ public class DefaultScopeDefine { ScopeDefaultColumn.VirtualColumnDefinition virtualColumn = (ScopeDefaultColumn.VirtualColumnDefinition) originalClass .getAnnotation(ScopeDefaultColumn.VirtualColumnDefinition.class); if (virtualColumn != null) { - scopeDefaultColumns.add(new ScopeDefaultColumn(virtualColumn.fieldName(), virtualColumn.columnName(), virtualColumn - .type(), virtualColumn.isID())); + scopeDefaultColumns.add( + new ScopeDefaultColumn(virtualColumn.fieldName(), virtualColumn.columnName(), virtualColumn + .type(), virtualColumn.isID())); } Field[] scopeClassField = originalClass.getDeclaredFields(); if (scopeClassField != null) { for (Field field : scopeClassField) { - ScopeDefaultColumn.DefinedByField definedByField = field.getAnnotation(ScopeDefaultColumn.DefinedByField.class); + ScopeDefaultColumn.DefinedByField definedByField = field.getAnnotation( + ScopeDefaultColumn.DefinedByField.class); if (definedByField != null) { - scopeDefaultColumns.add(new ScopeDefaultColumn(field.getName(), definedByField.columnName(), field.getType(), definedByField - .isID())); + scopeDefaultColumns.add( + new ScopeDefaultColumn(field.getName(), definedByField.columnName(), field.getType(), + definedByField + .isID() + )); } } } @@ -142,6 +158,12 @@ public class DefaultScopeDefine { } } + /** + * Fetch the name from given id + * + * @param id represents an existing scope id. + * @return scope name. + */ public static String nameOf(int id) { String name = ID_2_NAME.get(id); if (name == null) { @@ -150,6 +172,12 @@ public class DefaultScopeDefine { return name; } + /** + * Fetch the id of given name + * + * @param name represents an existing scope name + * @return scope id + */ public static int valueOf(String name) { Integer id = NAME_2_ID.get(name); if (id == null) { @@ -158,20 +186,41 @@ public class DefaultScopeDefine { return id; } + /** + * Reset all existing scope definitions. For test only. + */ public static void reset() { NAME_2_ID.clear(); ID_2_NAME.clear(); SCOPE_COLUMNS.clear(); } + /** + * Check whether current service belongs service catalog + * + * @param scopeId represents an existing scope id. + * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_CATALOG_NAME} + */ public static boolean inServiceCatalog(int scopeId) { return SERVICE_CATALOG.containsKey(scopeId); } + /** + * Check whether current service belongs service instance catalog + * + * @param scopeId represents an existing scope id. + * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_INSTANCE_CATALOG_NAME} + */ public static boolean inServiceInstanceCatalog(int scopeId) { return SERVICE_INSTANCE_CATALOG.containsKey(scopeId); } + /** + * Check whether current service belongs endpoint catalog + * + * @param scopeId represents an existing scope id. + * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #ENDPOINT_CATALOG_NAME} + */ public static boolean inEndpointCatalog(int scopeId) { return ENDPOINT_CATALOG.containsKey(scopeId); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDeclaration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDeclaration.java index c0ba4fe..177b1de 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDeclaration.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDeclaration.java @@ -22,9 +22,20 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord; /** - * DefaultScopeDefine id declaration. + * ScopeDeclaration includes + * + * 1.Source entity used in OAL script, such as Service as a Scope could be used like this in the OAL script. + * + * service_resp_time = from(Service.latency).longAvg(); + * + * 2. Manual source such as {@link Segment} + * + * 3. None stream entity like {@link ProfileTaskRecord}. + * + * NOTICE, in OAL script, `disable` is for stream, rather than source, it doesn't require this annotation. */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java index ad5d746..f822483 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java @@ -18,6 +18,12 @@ package org.apache.skywalking.oap.server.core.storage; +/** + * Any persistent entity should be an implementation of this interface. + */ public interface StorageData { + /** + * @return the unique id used in any storage option. + */ String id(); }