This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 61aeb6f18f Improve exception logging when we fail to index / transform
message (#12594)
61aeb6f18f is described below
commit 61aeb6f18fce8e4a87556ab92b8dd224a11d0301
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Fri Mar 8 04:39:09 2024 +0530
Improve exception logging when we fail to index / transform message (#12594)
---
.../realtime/RealtimeSegmentDataManager.java | 30 ++++++++++------------
1 file changed, 13 insertions(+), 17 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 24ca9dc6f8..2a5da62c2a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -572,6 +572,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// Decode message
StreamDataDecoderResult decodedRow =
_streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
+ StreamPartitionMsgOffset messageOffset =
messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index);
if (decodedRow.getException() != null) {
// TODO: based on a config, decide whether the record should be
silently dropped or stop further consumption on
// decode error
@@ -587,7 +588,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_numRowsErrored++;
// when exception happens we prefer abandoning the whole batch and
not partially indexing some rows
reusedResult.getTransformedRows().clear();
- String errorMessage = String.format("Caught exception while
transforming the record: %s", decodedRow);
+ String errorMessage = String.format("Caught exception while
transforming the record at offset: %s , row: %s",
+ messageOffset, decodedRow.getResult());
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, e));
}
@@ -617,14 +619,14 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L);
} catch (Exception e) {
_numRowsErrored++;
- String errorMessage = String.format("Caught exception while
indexing the record: %s", transformedRow);
+ String errorMessage = String.format("Caught exception while
indexing the record at offset: %s , row: %s",
+ messageOffset, transformedRow);
_segmentLogger.error(errorMessage, e);
- _realtimeTableDataManager.addSegmentError(_segmentNameStr,
- new SegmentErrorInfo(now(), errorMessage, e));
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, e));
}
}
}
- _currentOffset =
messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index);
+ _currentOffset = messageOffset;
_numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
_numRowsConsumed++;
streamMessageCount++;
@@ -799,8 +801,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_segmentLogger.error(errorMessage, e);
postStopConsumedMsg(e.getClass().getName());
_state = State.ERROR;
- _realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
errorMessage, e));
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, e));
_serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
return;
}
@@ -980,8 +981,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
String errorMessage =
String.format("Caught exception while moving index directory from:
%s to: %s", tempIndexDir, indexDir);
_segmentLogger.error(errorMessage, e);
- _realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
errorMessage, e));
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, e));
return null;
} finally {
FileUtils.deleteQuietly(tempSegmentFolder);
@@ -1001,8 +1001,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
String errorMessage =
String.format("Caught exception while taring index directory
from: %s to: %s", indexDir, segmentTarFile);
_segmentLogger.error(errorMessage, e);
- _realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
errorMessage, e));
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, e));
return null;
}
@@ -1011,8 +1010,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
String errorMessage = String.format("Failed to find file: %s under
index directory: %s",
V1Constants.MetadataKeys.METADATA_FILE_NAME, indexDir);
_segmentLogger.error(errorMessage);
- _realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
errorMessage, null));
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, null));
return null;
}
File creationMetaFile =
SegmentDirectoryPaths.findCreationMetaFile(indexDir);
@@ -1020,8 +1018,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
String errorMessage = String.format("Failed to find file: %s under
index directory: %s",
V1Constants.SEGMENT_CREATION_META, indexDir);
_segmentLogger.error(errorMessage);
- _realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
errorMessage, null));
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, null));
return null;
}
Map<String, File> metadataFiles = new HashMap<>();
@@ -1037,8 +1034,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
} catch (InterruptedException e) {
String errorMessage = "Interrupted while waiting for semaphore";
_segmentLogger.error(errorMessage, e);
- _realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
errorMessage, e));
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), errorMessage, e));
return null;
} finally {
if (_segBuildSemaphore != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]