This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 09fcdd59cc Add metric for out-of-order partial-upsert events (#8925)
09fcdd59cc is described below
commit 09fcdd59cc07e2faa154858075473cdec3518b60
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sun Jun 19 12:10:39 2022 -0700
Add metric for out-of-order partial-upsert events (#8925)
---
.../apache/pinot/common/metrics/ServerMeter.java | 1 +
.../upsert/PartitionUpsertMetadataManager.java | 22 ++++++++++++++++++----
2 files changed, 19 insertions(+), 4 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 19ae43f382..0cbd12c537 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -41,6 +41,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
REALTIME_PARTITION_MISMATCH("mismatch", false),
REALTIME_DEDUP_DROPPED("rows", false),
+ PARTIAL_UPSERT_OUT_OF_ORDER("rows", false),
ROWS_WITH_ERRORS("rows", false),
LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true),
LLC_CONTROLLER_RESPONSE_COMMIT("messages", true),
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 041a86443a..8214162fe9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -22,9 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.utils.HashUtils;
@@ -66,6 +68,8 @@ import org.slf4j.LoggerFactory;
public class PartitionUpsertMetadataManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
+ private static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS =
TimeUnit.MINUTES.toNanos(1);
+
private final String _tableNameWithType;
private final int _partitionId;
private final ServerMetrics _serverMetrics;
@@ -79,6 +83,9 @@ public class PartitionUpsertMetadataManager {
// Reused for reading previous record during partial upsert
private final GenericRow _reuse = new GenericRow();
+ private long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
+ private int _numOutOfOrderEvents = 0;
+
public PartitionUpsertMetadataManager(String tableNameWithType, int
partitionId, ServerMetrics serverMetrics,
@Nullable PartialUpsertHandler partialUpsertHandler, HashFunction
hashFunction) {
_tableNameWithType = tableNameWithType;
@@ -214,10 +221,17 @@ public class PartitionUpsertMetadataManager {
currentRecordLocation.getSegment().getRecord(currentRecordLocation.getDocId(),
_reuse);
return _partialUpsertHandler.merge(previousRecord, record);
} else {
- LOGGER.warn(
- "Got late event for partial-upsert: {} (current comparison value:
{}, record comparison value: {}), "
- + "skipping updating the record", record,
currentRecordLocation.getComparisonValue(),
- recordInfo.getComparisonValue());
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
+ _numOutOfOrderEvents++;
+ long currentTimeNs = System.nanoTime();
+ if (currentTimeNs - _lastOutOfOrderEventReportTimeNs >
OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
+ LOGGER.warn("Skipped {} out-of-order events for partial-upsert
table: {} "
+ + "(the last event has current comparison value: {}, record
comparison value: {})",
+ _numOutOfOrderEvents,
+ _tableNameWithType, currentRecordLocation.getComparisonValue(),
recordInfo.getComparisonValue());
+ _lastOutOfOrderEventReportTimeNs = currentTimeNs;
+ _numOutOfOrderEvents = 0;
+ }
return record;
}
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]