This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8b318c3fa74 Add wal compression metric items (#13105)
8b318c3fa74 is described below
commit 8b318c3fa745ff4f0a8b6d21477af3e27b049b13
Author: shuwenwei <[email protected]>
AuthorDate: Wed Aug 7 18:43:56 2024 +0800
Add wal compression metric items (#13105)
* add wal compression metrics
* fix code refered in review comments
---
.../iotdb/db/service/metrics/WritingMetrics.java | 88 +++++++++++++++++++++-
.../iotdb/db/storageengine/StorageEngine.java | 4 +
.../storageengine/dataregion/wal/io/LogWriter.java | 7 ++
.../dataregion/wal/io/WALInputStream.java | 36 +++++++--
4 files changed, 125 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
index 183d9a9f8c2..e7194126c1b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
@@ -174,10 +174,24 @@ public class WritingMetrics implements IMetricSet {
// region wal overview metrics
public static final String WAL_NODES_NUM = "wal_nodes_num";
public static final String USED_RATIO = "used_ratio";
+ public static final String SERIALIZED_WAL_BUFFER_SIZE_BYTE =
"serialized_wal_buffer_size";
+ public static final String WROTE_WAL_BUFFER_SIZE_BYTE =
"wrote_wal_buffer_size";
+ public static final String WAL_COMPRESS_COST_NS = "wal_compress_cost";
+ public static final String WAL_UNCOMPRESS_COST_NS = "wal_uncompress_cost";
+ public static final String READ_WAL_BUFFER_SIZE_BYTE =
"read_wal_buffer_size";
+ public static final String READ_WAL_BUFFER_COST_NS = "read_wal_buffer_cost";
+ public static final String WRITE_WAL_BUFFER_COST_NS =
"write_wal_buffer_cost";
public static final String ENTRIES_COUNT = "entries_count";
private Histogram usedRatioHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
private Histogram entriesCountHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private Histogram serializedWALBufferSizeHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private Histogram wroteWALBufferSizeHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private Histogram walCompressCostHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private Histogram walUncompressCostHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private Histogram readWALBufferSizeHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private Histogram readWALBufferCostHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private Histogram writeWALBufferCostHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
private void bindWALMetrics(AbstractMetricService metricService) {
metricService.createAutoGauge(
@@ -196,6 +210,49 @@ public class WritingMetrics implements IMetricSet {
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
ENTRIES_COUNT);
+
+ serializedWALBufferSizeHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.WAL_BUFFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ SERIALIZED_WAL_BUFFER_SIZE_BYTE);
+ wroteWALBufferSizeHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.WAL_BUFFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ WROTE_WAL_BUFFER_SIZE_BYTE);
+ walCompressCostHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.WAL_BUFFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ WAL_COMPRESS_COST_NS);
+ walUncompressCostHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.WAL_BUFFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ WAL_UNCOMPRESS_COST_NS);
+ readWALBufferSizeHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.WAL_BUFFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ READ_WAL_BUFFER_SIZE_BYTE);
+ readWALBufferCostHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.WAL_BUFFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ READ_WAL_BUFFER_COST_NS);
+ writeWALBufferCostHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.WAL_BUFFER.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ WRITE_WAL_BUFFER_COST_NS);
}
private void unbindWALMetrics(AbstractMetricService metricService) {
@@ -203,7 +260,16 @@ public class WritingMetrics implements IMetricSet {
MetricType.AUTO_GAUGE, Metric.WAL_NODE_NUM.toString(),
Tag.NAME.toString(), WAL_NODES_NUM);
usedRatioHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
entriesCountHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
- Arrays.asList(USED_RATIO, ENTRIES_COUNT)
+ Arrays.asList(
+ USED_RATIO,
+ ENTRIES_COUNT,
+ SERIALIZED_WAL_BUFFER_SIZE_BYTE,
+ WROTE_WAL_BUFFER_SIZE_BYTE,
+ WAL_COMPRESS_COST_NS,
+ WAL_UNCOMPRESS_COST_NS,
+ READ_WAL_BUFFER_SIZE_BYTE,
+ READ_WAL_BUFFER_COST_NS,
+ WRITE_WAL_BUFFER_COST_NS)
.forEach(
name ->
metricService.remove(
@@ -215,7 +281,6 @@ public class WritingMetrics implements IMetricSet {
// region wal cost metrics
public static final String MAKE_CHECKPOINT = "make_checkpoint";
public static final String SERIALIZE_WAL_ENTRY = "serialize_wal_entry";
- public static final String SERIALIZE_ONE_WAL_INFO_ENTRY =
"serialize_one_wal_info_entry";
public static final String SERIALIZE_WAL_ENTRY_TOTAL =
"serialize_wal_entry_total";
public static final String SYNC_WAL_BUFFER = "sync_wal_buffer";
public static final String SYNC = "sync";
@@ -807,6 +872,25 @@ public class WritingMetrics implements IMetricSet {
serializeWalEntryTotalTimer.updateNanos(costTimeInNanos);
}
+ public void recordCompressWALBufferCost(long costTimeInNanos) {
+ walCompressCostHistogram.update(costTimeInNanos);
+ }
+
+ public void recordWroteWALBuffer(int serializedSize, int wroteSize, long
wroteTimeCostInNanos) {
+ serializedWALBufferSizeHistogram.update(serializedSize);
+ wroteWALBufferSizeHistogram.update(wroteSize);
+ writeWALBufferCostHistogram.update(wroteTimeCostInNanos);
+ }
+
+ public void recordWALUncompressCost(long costTimeInNanos) {
+ walUncompressCostHistogram.update(costTimeInNanos);
+ }
+
+ public void recordWALRead(long size, long costTimeInNanos) {
+ readWALBufferSizeHistogram.update(size);
+ readWALBufferCostHistogram.update(costTimeInNanos);
+ }
+
public void recordSyncWALBufferCost(long costTimeInNanos, boolean forceFlag)
{
if (forceFlag) {
// fsync mode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index c742914f989..334f5f4d357 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -187,6 +187,7 @@ public class StorageEngine implements IService {
}
public void asyncRecover() throws StartupException {
+ long startRecoverTime = System.currentTimeMillis();
setAllSgReady(false);
cachedThreadPool =
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName());
@@ -210,6 +211,9 @@ public class StorageEngine implements IService {
checkResults(futures, "StorageEngine failed to recover.");
recoverRepairData();
setAllSgReady(true);
+ LOGGER.info(
+ "Storage Engine recover cost: {}s.",
+ (System.currentTimeMillis() - startRecoverTime) / 1000);
},
ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
recoverEndTrigger.start();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index 278fe93cae0..cdec209e507 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
@@ -75,6 +76,7 @@ public abstract class LogWriter implements ILogWriter {
@Override
public double write(ByteBuffer buffer) throws IOException {
+ long startTime = System.nanoTime();
// To support hot loading, we can't define it as a variable,
// because we need to dynamically check whether wal compression is enabled
// each time the buffer is serialized
@@ -112,7 +114,9 @@ public abstract class LogWriter implements ILogWriter {
headerBuffer.putInt(bufferSize);
if (compressed) {
headerBuffer.putInt(uncompressedSize);
+
WritingMetrics.getInstance().recordCompressWALBufferCost(System.nanoTime() -
startTime);
}
+ startTime = System.nanoTime();
try {
headerBuffer.flip();
logChannel.write(headerBuffer);
@@ -120,6 +124,9 @@ public abstract class LogWriter implements ILogWriter {
} catch (ClosedChannelException e) {
logger.warn("Cannot write to {}", logFile, e);
}
+ WritingMetrics.getInstance()
+ .recordWroteWALBuffer(uncompressedSize, bufferSize, System.nanoTime()
- startTime);
+
return ((double) bufferSize / uncompressedSize);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index eff873510fd..19e1564f946 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.tsfile.compress.IUnCompressor;
@@ -188,6 +189,8 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
if (channel.position() >= endOffset) {
throw new IOException("Reach the end offset of wal file");
}
+ long startTime = System.nanoTime();
+ long startPosition = channel.position();
if (version == WALFileVersion.V2) {
loadNextSegmentV2();
} else if (version == WALFileVersion.V1) {
@@ -195,6 +198,8 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
} else {
tryLoadSegment();
}
+ WritingMetrics.getInstance()
+ .recordWALRead(channel.position() - startPosition, System.nanoTime() -
startTime);
}
private void loadNextSegmentV1() throws IOException {
@@ -207,7 +212,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
dataBuffer = ByteBuffer.allocate(128 * 1024);
}
dataBuffer.clear();
- channel.read(dataBuffer);
+ readWALBufferFromChannel(dataBuffer);
dataBuffer.flip();
}
@@ -232,12 +237,12 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
compressedBuffer.clear();
// limit the buffer to prevent it from reading too much byte than
expected
compressedBuffer.limit(segmentInfo.dataInDiskSize);
- if (channel.read(compressedBuffer) != segmentInfo.dataInDiskSize) {
+ if (readWALBufferFromChannel(compressedBuffer) !=
segmentInfo.dataInDiskSize) {
throw new IOException("Unexpected end of file");
}
compressedBuffer.flip();
IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
- unCompressor.uncompress(compressedBuffer, dataBuffer);
+ uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor);
} else {
// An uncompressed segment
if (Objects.isNull(dataBuffer)
@@ -250,7 +255,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
// limit the buffer to prevent it from reading too much byte than
expected
dataBuffer.limit(segmentInfo.dataInDiskSize);
- if (channel.read(dataBuffer) != segmentInfo.dataInDiskSize) {
+ if (readWALBufferFromChannel(dataBuffer) != segmentInfo.dataInDiskSize) {
throw new IOException("Unexpected end of file");
}
}
@@ -296,15 +301,15 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
compressedBuffer =
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
- channel.read(compressedBuffer);
+ readWALBufferFromChannel(compressedBuffer);
compressedBuffer.flip();
IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(segmentInfo.compressionType);
dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
- unCompressor.uncompress(compressedBuffer, dataBuffer);
+ uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor);
MmapUtil.clean(compressedBuffer);
} else {
dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
- channel.read(dataBuffer);
+ readWALBufferFromChannel(dataBuffer);
dataBuffer.flip();
}
@@ -351,7 +356,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
info.dataInDiskSize = segmentHeaderWithoutCompressedSizeBuffer.getInt();
if (info.compressionType != CompressionType.UNCOMPRESSED) {
compressedSizeBuffer.clear();
- channel.read(compressedSizeBuffer);
+ readWALBufferFromChannel(compressedSizeBuffer);
compressedSizeBuffer.flip();
info.uncompressedSize = compressedSizeBuffer.getInt();
} else {
@@ -360,6 +365,21 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
return info;
}
+ private int readWALBufferFromChannel(ByteBuffer buffer) throws IOException {
+ long startTime = System.nanoTime();
+ int size = channel.read(buffer);
+ WritingMetrics.getInstance().recordWALRead(size, System.nanoTime() -
startTime);
+ return size;
+ }
+
+ private void uncompressWALBuffer(
+ ByteBuffer compressed, ByteBuffer uncompressed, IUnCompressor
unCompressor)
+ throws IOException {
+ long startTime = System.nanoTime();
+ unCompressor.uncompress(compressed, uncompressed);
+ WritingMetrics.getInstance().recordWALUncompressCost(System.nanoTime() -
startTime);
+ }
+
private static class SegmentInfo {
public CompressionType compressionType;
public int dataInDiskSize;