This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 61aeb6f18f Improve exception logging when we fail to index / transform 
message (#12594)
61aeb6f18f is described below

commit 61aeb6f18fce8e4a87556ab92b8dd224a11d0301
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Fri Mar 8 04:39:09 2024 +0530

    Improve exception logging when we fail to index / transform message (#12594)
---
 .../realtime/RealtimeSegmentDataManager.java       | 30 ++++++++++------------
 1 file changed, 13 insertions(+), 17 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 24ca9dc6f8..2a5da62c2a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -572,6 +572,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       // Decode message
       StreamDataDecoderResult decodedRow = 
_streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
       msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
+      StreamPartitionMsgOffset messageOffset = 
messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index);
       if (decodedRow.getException() != null) {
         // TODO: based on a config, decide whether the record should be 
silently dropped or stop further consumption on
         // decode error
@@ -587,7 +588,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           _numRowsErrored++;
           // when exception happens we prefer abandoning the whole batch and 
not partially indexing some rows
           reusedResult.getTransformedRows().clear();
-          String errorMessage = String.format("Caught exception while 
transforming the record: %s", decodedRow);
+          String errorMessage = String.format("Caught exception while 
transforming the record at offset: %s , row: %s",
+              messageOffset, decodedRow.getResult());
           _segmentLogger.error(errorMessage, e);
           _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
         }
@@ -617,14 +619,14 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
             
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L);
           } catch (Exception e) {
             _numRowsErrored++;
-            String errorMessage = String.format("Caught exception while 
indexing the record: %s", transformedRow);
+            String errorMessage = String.format("Caught exception while 
indexing the record at offset: %s , row: %s",
+                messageOffset, transformedRow);
             _segmentLogger.error(errorMessage, e);
-            _realtimeTableDataManager.addSegmentError(_segmentNameStr,
-                new SegmentErrorInfo(now(), errorMessage, e));
+            _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
           }
         }
       }
-      _currentOffset = 
messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index);
+      _currentOffset = messageOffset;
       _numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
       _numRowsConsumed++;
       streamMessageCount++;
@@ -799,8 +801,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         _segmentLogger.error(errorMessage, e);
         postStopConsumedMsg(e.getClass().getName());
         _state = State.ERROR;
-        _realtimeTableDataManager
-            .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), 
errorMessage, e));
+        _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
         _serverMetrics.setValueOfTableGauge(_clientId, 
ServerGauge.LLC_PARTITION_CONSUMING, 0);
         return;
       }
@@ -980,8 +981,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         String errorMessage =
             String.format("Caught exception while moving index directory from: 
%s to: %s", tempIndexDir, indexDir);
         _segmentLogger.error(errorMessage, e);
-        _realtimeTableDataManager
-            .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), 
errorMessage, e));
+        _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
         return null;
       } finally {
         FileUtils.deleteQuietly(tempSegmentFolder);
@@ -1001,8 +1001,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           String errorMessage =
               String.format("Caught exception while taring index directory 
from: %s to: %s", indexDir, segmentTarFile);
           _segmentLogger.error(errorMessage, e);
-          _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), 
errorMessage, e));
+          _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
           return null;
         }
 
@@ -1011,8 +1010,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           String errorMessage = String.format("Failed to find file: %s under 
index directory: %s",
               V1Constants.MetadataKeys.METADATA_FILE_NAME, indexDir);
           _segmentLogger.error(errorMessage);
-          _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), 
errorMessage, null));
+          _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, null));
           return null;
         }
         File creationMetaFile = 
SegmentDirectoryPaths.findCreationMetaFile(indexDir);
@@ -1020,8 +1018,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           String errorMessage = String.format("Failed to find file: %s under 
index directory: %s",
               V1Constants.SEGMENT_CREATION_META, indexDir);
           _segmentLogger.error(errorMessage);
-          _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), 
errorMessage, null));
+          _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, null));
           return null;
         }
         Map<String, File> metadataFiles = new HashMap<>();
@@ -1037,8 +1034,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     } catch (InterruptedException e) {
       String errorMessage = "Interrupted while waiting for semaphore";
       _segmentLogger.error(errorMessage, e);
-      _realtimeTableDataManager
-          .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), 
errorMessage, e));
+      _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
       return null;
     } finally {
       if (_segBuildSemaphore != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to