This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d7411c5d38 Fix ramping delay caused by long lasting sequence of
unfiltered messa… (#10418)
d7411c5d38 is described below
commit d7411c5d381e0c44df09092af023f5cd113e6584
Author: Juan Gomez <[email protected]>
AuthorDate: Fri Mar 31 22:31:28 2023 -0700
Fix ramping delay caused by long lasting sequence of unfiltered messa…
(#10418)
---
.../manager/realtime/IngestionDelayTracker.java | 12 ++++-
.../realtime/LLRealtimeSegmentDataManager.java | 51 +++++++++++++++-------
.../plugin/stream/kafka20/KafkaMessageBatch.java | 16 ++++++-
.../kafka20/KafkaPartitionLevelConsumer.java | 5 ++-
.../org/apache/pinot/spi/stream/MessageBatch.java | 14 ++++++
5 files changed, 78 insertions(+), 20 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 6e11297fd0..57ca21def1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -214,12 +214,20 @@ public class IngestionDelayTracker {
// Do not update the ingestion delay metrics during server startup period
return;
}
+ if ((ingestionTimeMs < 0) && (firstStreamIngestionTimeMs < 0)) {
+ // If stream does not return a valid ingestion timestamps don't publish
a metric
+ return;
+ }
IngestionTimestamps previousMeasure =
_partitionToIngestionTimestampsMap.put(partitionGroupId,
new IngestionTimestamps(ingestionTimeMs, firstStreamIngestionTimeMs));
if (previousMeasure == null) {
// First time we start tracking a partition we should start tracking it
via metric
- _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId,
- ServerGauge.REALTIME_INGESTION_DELAY_MS, () ->
getPartitionIngestionDelayMs(partitionGroupId));
+ // Only publish the metric if supported by the underlying stream. If not
supported the stream
+ // returns Long.MIN_VALUE
+ if (ingestionTimeMs >= 0) {
+ _serverMetrics.setOrUpdatePartitionGauge(_metricName,
partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS,
+ () -> getPartitionIngestionDelayMs(partitionGroupId));
+ }
if (firstStreamIngestionTimeMs >= 0) {
// Only publish this metric when creation time is supported by the
underlying stream
// When this timestamp is not supported it always returns the value
Long.MIN_VALUE
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 6236babece..4f3428a5fe 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -527,9 +527,12 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
int indexedMessageCount = 0;
int streamMessageCount = 0;
boolean canTakeMore = true;
+ boolean hasTransformedRows = false;
TransformPipeline.Result reusedResult = new TransformPipeline.Result();
boolean prematureExit = false;
+ RowMetadata msgMetadata = null;
+
for (int index = 0; index < messageCount; index++) {
prematureExit = _shouldStop || endCriteriaReached();
if (prematureExit) {
@@ -562,7 +565,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// Decode message
StreamDataDecoderResult decodedRow =
_streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
- RowMetadata msgMetadata =
messagesAndOffsets.getStreamMessage(index).getMetadata();
+ msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
if (decodedRow.getException() != null) {
// TODO: based on a config, decide whether the record should be
silently dropped or stop further consumption on
// decode error
@@ -591,7 +594,11 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
reusedResult.getIncompleteRowCount(),
realtimeIncompleteRowsConsumedMeter);
}
- for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
+ List<GenericRow> transformedRows = reusedResult.getTransformedRows();
+ if (transformedRows.size() > 0) {
+ hasTransformedRows = true;
+ }
+ for (GenericRow transformedRow : transformedRows) {
try {
canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
indexedMessageCount++;
@@ -614,18 +621,31 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_numRowsConsumed++;
streamMessageCount++;
}
- updateIngestionDelay(indexedMessageCount);
+
+ if (indexedMessageCount > 0) {
+ // Record Ingestion delay for this partition with metadata for last
message we processed
+ updateIngestionDelay(_lastRowMetadata);
+ } else if (!hasTransformedRows && (msgMetadata != null)) {
+ // If all messages were filtered by transformation, we still attempt to
update ingestion delay using
+ // the metadata for the last message we processed if any.
+ updateIngestionDelay(msgMetadata);
+ }
+
updateCurrentDocumentCountMetrics();
if (messagesAndOffsets.getUnfilteredMessageCount() > 0) {
_hasMessagesFetched = true;
+ if (messageCount == 0) {
+ // If we received events from the stream but all were filtered, we
attempt to estimate the ingestion
+ // delay from the metadata of the last filtered message received.
+ updateIngestionDelay(messagesAndOffsets.getLastMessageMetadata());
+ }
if (streamMessageCount > 0 && _segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("Indexed {} messages ({} messages read from
stream) current offset {}",
indexedMessageCount, streamMessageCount, _currentOffset);
}
} else if (!prematureExit) {
// Record Pinot ingestion delay as zero since we are up-to-date and no
new events
- long currentTimeMs = System.currentTimeMillis();
- _realtimeTableDataManager.updateIngestionDelay(currentTimeMs,
currentTimeMs, _partitionGroupId);
+ setIngestionDelayToZero();
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("empty batch received - sleeping for {}ms",
idlePipeSleepTimeMillis);
}
@@ -1566,18 +1586,19 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_partitionMetadataProvider =
_streamConsumerFactory.createPartitionMetadataProvider(_clientId,
_partitionGroupId);
}
+ private void updateIngestionDelay(RowMetadata metadata) {
+ if (metadata != null) {
+
_realtimeTableDataManager.updateIngestionDelay(metadata.getRecordIngestionTimeMs(),
+ metadata.getFirstStreamRecordIngestionTimeMs(), _partitionGroupId);
+ }
+ }
+
/*
- * Updates the ingestion delay if messages were processed using the time
stamp for the last consumed event.
- *
- * @param indexedMessagesCount
+ * Sets ingestion delay to zero in situations where we are caught up
processing events.
*/
- private void updateIngestionDelay(int indexedMessageCount) {
- if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) {
- // Record Ingestion delay for this partition
-
_realtimeTableDataManager.updateIngestionDelay(_lastRowMetadata.getRecordIngestionTimeMs(),
- _lastRowMetadata.getFirstStreamRecordIngestionTimeMs(),
- _partitionGroupId);
- }
+ private void setIngestionDelayToZero() {
+ long currentTimeMs = System.currentTimeMillis();
+ _realtimeTableDataManager.updateIngestionDelay(currentTimeMs,
currentTimeMs, _partitionGroupId);
}
// This should be done during commit? We may not always commit when we build
a segment....
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
index 1852c8bdc8..dbc3e8d2a6 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
@@ -24,6 +24,7 @@ import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -31,16 +32,29 @@ public class KafkaMessageBatch implements
MessageBatch<StreamMessage<byte[]>> {
private final List<StreamMessage<byte[]>> _messageList;
private final int _unfilteredMessageCount;
private final long _lastOffset;
+ private final StreamMessageMetadata _lastMessageMetadata;
/**
* @param unfilteredMessageCount how many messages were received from the
topic before being filtered
* @param lastOffset the offset of the last message in the batch
* @param batch the messages, which may be smaller than {@see
unfilteredMessageCount}
+ * @param lastMessageMetadata metadata for last filtered message in the
batch, useful for estimating ingestion delay
+ * when a batch has all messages filtered.
*/
- public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset,
List<StreamMessage<byte[]>> batch) {
+ public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset,
List<StreamMessage<byte[]>> batch,
+ StreamMessageMetadata lastMessageMetadata) {
_messageList = batch;
_lastOffset = lastOffset;
_unfilteredMessageCount = unfilteredMessageCount;
+ _lastMessageMetadata = lastMessageMetadata;
+ }
+
+ @Override
+ /**
+ * Returns the metadata for the last filtered message if any, null otherwise.
+ */
+ public StreamMessageMetadata getLastMessageMetadata() {
+ return _lastMessageMetadata;
}
@Override
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index 46504bbc0d..ff90f4b1a3 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -70,15 +70,16 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
List<ConsumerRecord<String, Bytes>> messageAndOffsets =
consumerRecords.records(_topicPartition);
List<StreamMessage<byte[]>> filtered = new
ArrayList<>(messageAndOffsets.size());
long lastOffset = startOffset;
+ StreamMessageMetadata rowMetadata = null;
for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
long offset = messageAndOffset.offset();
_lastFetchedOffset = offset;
if (offset >= startOffset && (endOffset > offset || endOffset < 0)) {
Bytes message = messageAndOffset.value();
+ rowMetadata = (StreamMessageMetadata)
_kafkaMetadataExtractor.extract(messageAndOffset);
if (message != null) {
String key = messageAndOffset.key();
byte[] keyBytes = key != null ? key.getBytes(StandardCharsets.UTF_8)
: null;
- StreamMessageMetadata rowMetadata = (StreamMessageMetadata)
_kafkaMetadataExtractor.extract(messageAndOffset);
filtered.add(new KafkaStreamMessage(keyBytes, message.get(),
rowMetadata));
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Tombstone message at offset: {}", offset);
@@ -89,6 +90,6 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
endOffset);
}
}
- return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset,
filtered);
+ return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset,
filtered, rowMetadata);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index d22a46af67..7ae4226e47 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.spi.stream;
+import javax.annotation.Nullable;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -116,4 +117,17 @@ public interface MessageBatch<T> {
default boolean isEndOfPartitionGroup() {
return false;
}
+
+ /**
+ * This is useful while determining ingestion delay for a message batch.
Retaining metadata for last filtered message
+ * in a batch can enable us to estimate the ingestion delay for the batch.
+ * Note that a batch can be fully filtered, and we can still retain the
metadata for the last filtered message to
+ * facilitate computing ingestion delay in the face of a fully filtered
batch.
+ *
+ * @return null by default.
+ */
+ @Nullable
+ default public StreamMessageMetadata getLastMessageMetadata() {
+ return null;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]