This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new dc0f653 Fix TTL issue. (#3292)
dc0f653 is described below
commit dc0f6530cb05b656be46f8146e0af9ae58b9bb3c
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Tue Aug 20 15:36:31 2019 +0800
Fix TTL issue. (#3292)
* Fix TTL issue.
* Fix issue.
* Rename `getSecondTimeBucket` to `getRecordTimeBucket`.
---
.../server/core/alarm/AlarmStandardPersistence.java | 2 +-
.../oap/server/core/analysis/TimeBucket.java | 8 +++++++-
.../server/core/storage/ttl/GeneralStorageTTL.java | 6 +++++-
.../{StorageTTL.java => RecordTTLCalculator.java} | 17 ++++++++++++-----
.../oap/server/core/storage/ttl/StorageTTL.java | 4 +++-
.../server/receiver/jaeger/JaegerGRPCHandler.java | 2 +-
.../listener/endpoint/MultiScopesSpanListener.java | 2 +-
.../parser/listener/segment/SegmentSpanListener.java | 2 +-
.../server/receiver/zipkin/trace/SpanForward.java | 2 +-
.../elasticsearch/base/HistoryDeleteEsDAO.java | 9 ++++++++-
.../elasticsearch/ttl/ElasticsearchStorageTTL.java | 6 +++++-
.../elasticsearch/ttl/EsRecordTTLCalculator.java} | 20 ++++++++++++++------
.../plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java | 12 ++++++++++--
13 files changed, 69 insertions(+), 23 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmStandardPersistence.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmStandardPersistence.java
index 48dcc8a..6237bfd 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmStandardPersistence.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmStandardPersistence.java
@@ -45,7 +45,7 @@ public class AlarmStandardPersistence implements
AlarmCallback {
record.setName(message.getName());
record.setAlarmMessage(message.getAlarmMessage());
record.setStartTime(message.getStartTime());
-
record.setTimeBucket(TimeBucket.getSecondTimeBucket(message.getStartTime()));
+
record.setTimeBucket(TimeBucket.getRecordTimeBucket(message.getStartTime()));
RecordStreamProcessor.getInstance().in(record);
});
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/TimeBucket.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/TimeBucket.java
index 18ff6f9..3df3470 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/TimeBucket.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/TimeBucket.java
@@ -25,7 +25,13 @@ import
org.apache.skywalking.oap.server.core.UnexpectedException;
*/
public class TimeBucket {
- public static long getSecondTimeBucket(long time) {
+ /**
+ * Record time bucket format in Second Unit.
+ *
+ * @param time Timestamp
+ * @return time in second format.
+ */
+ public static long getRecordTimeBucket(long time) {
return getTimeBucket(time, Downsampling.Second);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/GeneralStorageTTL.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/GeneralStorageTTL.java
index aefaa53..971bcb7 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/GeneralStorageTTL.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/GeneralStorageTTL.java
@@ -24,7 +24,7 @@ import
org.apache.skywalking.oap.server.core.analysis.Downsampling;
*/
public class GeneralStorageTTL implements StorageTTL {
- @Override public TTLCalculator calculator(Downsampling downsampling) {
+ @Override public TTLCalculator metricsCalculator(Downsampling
downsampling) {
switch (downsampling) {
case Hour:
return new HourTTLCalculator();
@@ -38,4 +38,8 @@ public class GeneralStorageTTL implements StorageTTL {
return new MinuteTTLCalculator();
}
}
+
+ @Override public TTLCalculator recordCalculator() {
+ return new RecordTTLCalculator();
+ }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/RecordTTLCalculator.java
similarity index 67%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/RecordTTLCalculator.java
index 0f38659..c06f26f 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/RecordTTLCalculator.java
@@ -13,15 +13,22 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
package org.apache.skywalking.oap.server.core.storage.ttl;
-import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.DataTTLConfig;
+import org.joda.time.DateTime;
/**
- * @author peng-yongsheng
+ * Calculate TTL for record.
+ *
+ * @author wusheng
*/
-public interface StorageTTL {
- TTLCalculator calculator(Downsampling downsampling);
-}
+public class RecordTTLCalculator implements TTLCalculator {
+
+ @Override public long timeBefore(DateTime currentTime, DataTTLConfig
dataTTLConfig) {
+ return Long.valueOf(currentTime.plusMinutes(0 -
dataTTLConfig.getRecordDataTTL()).toString("yyyyMMddHHmmss"));
+ }
+}
\ No newline at end of file
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
index 0f38659..7a0b4e9 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
@@ -23,5 +23,7 @@ import
org.apache.skywalking.oap.server.core.analysis.Downsampling;
* @author peng-yongsheng
*/
public interface StorageTTL {
- TTLCalculator calculator(Downsampling downsampling);
+ TTLCalculator metricsCalculator(Downsampling downsampling);
+
+ TTLCalculator recordCalculator();
}
diff --git
a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
index cfcb26d..defcf4e 100644
---
a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
+++
b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerGRPCHandler.java
@@ -99,7 +99,7 @@ public class JaegerGRPCHandler extends
CollectorServiceGrpc.CollectorServiceImpl
long duration = span.getDuration().getNanos() / 1_000_000;
jaegerSpan.setStartTime(Instant.ofEpochSecond(span.getStartTime().getSeconds(),
span.getStartTime().getNanos()).toEpochMilli());
- long timeBucket =
TimeBucket.getSecondTimeBucket(jaegerSpan.getStartTime());
+ long timeBucket =
TimeBucket.getRecordTimeBucket(jaegerSpan.getStartTime());
jaegerSpan.setTimeBucket(timeBucket);
jaegerSpan.setEndTime(jaegerSpan.getStartTime() + duration);
jaegerSpan.setLatency((int)duration);
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
index 20f3b9b..7459abd 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
@@ -157,7 +157,7 @@ public class MultiScopesSpanListener implements
EntrySpanListener, ExitSpanListe
statement.setId(segmentCoreInfo.getSegmentId() + "-" +
spanDecorator.getSpanId());
statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());
statement.setLatency(sourceBuilder.getLatency());
-
statement.setTimeBucket(TimeBucket.getSecondTimeBucket(segmentCoreInfo.getStartTime()));
+
statement.setTimeBucket(TimeBucket.getRecordTimeBucket(segmentCoreInfo.getStartTime()));
statement.setTraceId(traceId);
for (KeyStringValuePair tag : spanDecorator.getAllTags()) {
if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
index 1b637ce..48c2618 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -68,7 +68,7 @@ public class SegmentSpanListener implements
FirstSpanListener, EntrySpanListener
return;
}
- long timeBucket =
TimeBucket.getSecondTimeBucket(segmentCoreInfo.getStartTime());
+ long timeBucket =
TimeBucket.getRecordTimeBucket(segmentCoreInfo.getStartTime());
segment.setSegmentId(segmentCoreInfo.getSegmentId());
segment.setServiceId(segmentCoreInfo.getServiceId());
diff --git
a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
index a05e005..84353e8 100644
---
a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
+++
b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java
@@ -93,7 +93,7 @@ public class SpanForward {
long startTime = span.timestampAsLong() / 1000;
zipkinSpan.setStartTime(startTime);
if (startTime != 0) {
- long timeBucket =
TimeBucket.getSecondTimeBucket(zipkinSpan.getStartTime());
+ long timeBucket =
TimeBucket.getRecordTimeBucket(zipkinSpan.getStartTime());
zipkinSpan.setTimeBucket(timeBucket);
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
index 1299cc6..e780e1f 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
@@ -25,6 +25,7 @@ import
org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.ttl.StorageTTL;
+import org.apache.skywalking.oap.server.core.storage.ttl.TTLCalculator;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.joda.time.DateTime;
@@ -51,7 +52,13 @@ public class HistoryDeleteEsDAO extends EsDAO implements
IHistoryDeleteDAO {
ConfigService configService =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(ConfigService.class);
ElasticSearchClient client = getClient();
- long timeBefore =
storageTTL.calculator(model.getDownsampling()).timeBefore(new DateTime(),
configService.getDataTTLConfig());
+ TTLCalculator ttlCalculator;
+ if (model.isRecord()) {
+ ttlCalculator = storageTTL.recordCalculator();
+ } else {
+ ttlCalculator =
storageTTL.metricsCalculator(model.getDownsampling());
+ }
+ long timeBefore = ttlCalculator.timeBefore(new DateTime(),
configService.getDataTTLConfig());
if (model.isCapableOfTimeSeries()) {
List<String> indexes =
client.retrievalIndexByAliases(model.getName());
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/ElasticsearchStorageTTL.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/ElasticsearchStorageTTL.java
index 5779de4..46af52d 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/ElasticsearchStorageTTL.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/ElasticsearchStorageTTL.java
@@ -25,7 +25,7 @@ import org.apache.skywalking.oap.server.core.storage.ttl.*;
*/
public class ElasticsearchStorageTTL implements StorageTTL {
- @Override public TTLCalculator calculator(Downsampling downsampling) {
+ @Override public TTLCalculator metricsCalculator(Downsampling
downsampling) {
switch (downsampling) {
case Month:
return new MonthTTLCalculator();
@@ -37,4 +37,8 @@ public class ElasticsearchStorageTTL implements StorageTTL {
return new DayTTLCalculator();
}
}
+
+ @Override public TTLCalculator recordCalculator() {
+ return new EsRecordTTLCalculator();
+ }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsRecordTTLCalculator.java
similarity index 59%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
copy to
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsRecordTTLCalculator.java
index 0f38659..19ce59d 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/StorageTTL.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/ttl/EsRecordTTLCalculator.java
@@ -13,15 +13,23 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
-package org.apache.skywalking.oap.server.core.storage.ttl;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.ttl;
-import org.apache.skywalking.oap.server.core.analysis.Downsampling;
+import org.apache.skywalking.oap.server.core.DataTTLConfig;
+import org.apache.skywalking.oap.server.core.storage.ttl.TTLCalculator;
+import org.joda.time.DateTime;
/**
- * @author peng-yongsheng
+ * Calculate TTL for record.
+ *
+ * @author wusheng
*/
-public interface StorageTTL {
- TTLCalculator calculator(Downsampling downsampling);
-}
+public class EsRecordTTLCalculator implements TTLCalculator {
+
+ @Override public long timeBefore(DateTime currentTime, DataTTLConfig
dataTTLConfig) {
+ return Long.valueOf(currentTime.plusDays(0 -
dataTTLConfig.getRecordDataTTL()).toString("yyyyMMdd"));
+ }
+}
\ No newline at end of file
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
index 69fcd40..f709566 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
@@ -19,12 +19,14 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.SQLException;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.ttl.StorageTTL;
+import org.apache.skywalking.oap.server.core.storage.ttl.TTLCalculator;
import
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@@ -53,7 +55,13 @@ public class H2HistoryDeleteDAO implements IHistoryDeleteDAO
{
SQLBuilder dataDeleteSQL = new SQLBuilder("delete from " +
model.getName() + " where ").append(timeBucketColumnName).append("<= ?");
try (Connection connection = client.getConnection()) {
- long timeBefore =
storageTTL.calculator(model.getDownsampling()).timeBefore(new DateTime(),
configService.getDataTTLConfig());
+ TTLCalculator ttlCalculator;
+ if (model.isRecord()) {
+ ttlCalculator = storageTTL.recordCalculator();
+ } else {
+ ttlCalculator =
storageTTL.metricsCalculator(model.getDownsampling());
+ }
+ long timeBefore = ttlCalculator.timeBefore(new DateTime(),
configService.getDataTTLConfig());
client.execute(connection, dataDeleteSQL.toString(), timeBefore);
} catch (JDBCClientException | SQLException e) {
throw new IOException(e.getMessage(), e);