This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch top-sql in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/top-sql by this push: new 4f2a82a Finish the top n database statement persistent. 4f2a82a is described below commit 4f2a82acfafe85908c3144c80d7736c973e435cf Author: Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Sun Feb 10 22:27:39 2019 +0800 Finish the top n database statement persistent. --- .../core/analysis/data/LimitedSizeDataCache.java | 12 ++++++----- .../analysis/data/LimitedSizeDataCollection.java | 1 + .../oap/server/core/analysis/data/Window.java | 10 +++++++++ .../database/DatabaseStatementDispatcher.java | 1 + .../manual/database/TopNDatabaseStatement.java | 5 +++-- .../server/core/analysis/worker/TopNWorker.java | 24 ++++++++++++++++++---- .../server/core/source/DatabaseSlowStatement.java | 1 + .../listener/endpoint/MultiScopesSpanListener.java | 5 ++++- 8 files changed, 47 insertions(+), 12 deletions(-) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java index d64d3e7..33cfe1d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java @@ -22,11 +22,13 @@ import org.apache.skywalking.oap.server.core.storage.*; public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> extends Window<STORAGE_DATA> implements DataCache { - private SWCollection<STORAGE_DATA> lockedMergeDataCollection; + private SWCollection<STORAGE_DATA> limitedSizeDataCollection; private final int limitSize; public LimitedSizeDataCache(int limitSize) { + super(false); this.limitSize = limitSize; + init(); } @Override public SWCollection<STORAGE_DATA> collectionInstance() { @@ -34,16 +36,16 @@ public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> ex } public void add(STORAGE_DATA data) { - lockedMergeDataCollection.put(data); + limitedSizeDataCollection.put(data); } @Override public void writing() { - lockedMergeDataCollection = getCurrentAndWriting(); + limitedSizeDataCollection = getCurrentAndWriting(); } @Override public void finishWriting() { - lockedMergeDataCollection.finishWriting(); - lockedMergeDataCollection = null; + limitedSizeDataCollection.finishWriting(); + limitedSizeDataCollection = null; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java index b330a4e..70b7966 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java @@ -104,6 +104,7 @@ public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageDat // Add the value as biggest in top N list storageDataList.addLast(value); + storageDataList.removeFirst(); } @Override public Collection<STORAGE_DATA> collection() { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java index 6f9486d..ff2ca6d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java @@ -33,6 +33,16 @@ public abstract class Window<DATA> { private SWCollection<DATA> windowDataB; Window() { + this(true); + } + + Window(boolean autoInit) { + if (autoInit) { + init(); + } + } + + protected void init() { this.windowDataA = collectionInstance(); this.windowDataB = collectionInstance(); this.pointer = windowDataA; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java index a94d370..628751a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java @@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement; public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> { @Override public void dispatch(DatabaseSlowStatement source) { TopNDatabaseStatement statement = new TopNDatabaseStatement(); + statement.setId(source.getId()); statement.setDatabaseServiceId(source.getDatabaseServiceId()); statement.setLatency(source.getLatency()); statement.setStatement(source.getStatement()); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java index 3639ae7..c40a6bb 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java @@ -35,13 +35,14 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*; @TopNType @StorageEntity(name = TopNDatabaseStatement.INDEX_NAME, builder = TopNDatabaseStatement.Builder.class, source = Scope.DatabaseSlowStatement) public class TopNDatabaseStatement extends TopN { - public static final String INDEX_NAME = "TOP_N_DATABASE_STATEMENT"; + public static final String INDEX_NAME = "top_n_database_statement"; public static final String DATABASE_SERVICE_ID = "db_service_id"; + @Setter private String id; @Getter @Setter @Column(columnName = DATABASE_SERVICE_ID) private int databaseServiceId; @Override public String id() { - throw new UnexpectedException("id() should not be called."); + return id; } @Override public boolean equals(Object o) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java index 6ccccd8..5d6304e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java @@ -56,7 +56,12 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top } @Override void onWork(TopN data) { - limitedSizeDataCache.add(data); + limitedSizeDataCache.writing(); + try { + limitedSizeDataCache.add(data); + } finally { + limitedSizeDataCache.finishWriting(); + } } /** @@ -72,13 +77,24 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top return limitedSizeDataCache; } - @Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) { + /** + * The top N worker persistent cycle is much less than the others, override `flushAndSwitch` to extend the execute + * time windows. + * + * Switch and persistent attempt happens based on reportCycle. + * + * @return + */ + @Override public boolean flushAndSwitch() { long now = System.currentTimeMillis(); if (now - lastReportTimestamp <= reportCycle) { - return new ArrayList<>(0); + return false; } lastReportTimestamp = now; + return super.flushAndSwitch(); + } + @Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) { List<Object> batchCollection = new LinkedList<>(); cache.getLast().collection().forEach(record -> { try { @@ -91,7 +107,7 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top } @Override public void in(TopN n) { - limitedSizeDataCache.add(n); + dataCarrier.produce(n); } private class TopNConsumer implements IConsumer<TopN> { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java index c3a7a68..254a5e7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java @@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.source.annotation.SourceType; */ @SourceType public class DatabaseSlowStatement extends Source { + @Getter @Setter private String id; @Getter @Setter private int databaseServiceId; @Getter @Setter private String statement; @Getter @Setter private long latency; diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java index ebb3468..4b970fe 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.cache.*; import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*; @@ -147,11 +148,13 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe setPublicAttrs(sourceBuilder, spanDecorator); exitSourceBuilders.add(sourceBuilder); - if (spanDecorator.getSpanLayer().equals(RequestType.DATABASE) + if (sourceBuilder.getType().equals(RequestType.DATABASE) && sourceBuilder.getLatency() > config.getSlowDBAccessThreshold()) { DatabaseSlowStatement statement = new DatabaseSlowStatement(); + statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId()); statement.setDatabaseServiceId(sourceBuilder.getDestServiceId()); statement.setLatency(sourceBuilder.getLatency()); + statement.setTimeBucket(TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime())); statement.setTraceId(segmentCoreInfo.getSegmentId()); for (KeyStringValuePair tag : spanDecorator.getAllTags()) { if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {