This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cd787f9f15 Gracefully handle bad segment time data during ingestion 
if continueOnError is true (#16462)
0cd787f9f15 is described below

commit 0cd787f9f15b7510c62792467f63699620370d66
Author: Yash Mayya <[email protected]>
AuthorDate: Fri Aug 22 10:41:13 2025 +0530

    Gracefully handle bad segment time data during ingestion if continueOnError 
is true (#16462)
---
 .../creator/impl/SegmentColumnarIndexCreator.java  | 127 +++++++++++++--------
 .../local/segment/index/ColumnMetadataTest.java    |  33 ++++++
 2 files changed, 110 insertions(+), 50 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 412390d2067..faa777128e8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -498,62 +498,89 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
     if (timeColumnName != null) {
       ColumnIndexCreationInfo timeColumnIndexCreationInfo = 
_indexCreationInfoMap.get(timeColumnName);
       if (timeColumnIndexCreationInfo != null) {
-        long startTime;
-        long endTime;
-        TimeUnit timeUnit;
-
-        // Use start/end time in config if defined
-        if (_config.getStartTime() != null) {
-          startTime = Long.parseLong(_config.getStartTime());
-          endTime = Long.parseLong(_config.getEndTime());
-          timeUnit = Preconditions.checkNotNull(_config.getSegmentTimeUnit());
-        } else {
-          if (_totalDocs > 0) {
-            String startTimeStr = 
timeColumnIndexCreationInfo.getMin().toString();
-            String endTimeStr = 
timeColumnIndexCreationInfo.getMax().toString();
-
-            if (_config.getTimeColumnType() == 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
-              // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value 
into millis since epoch
-              // Use DateTimeFormatter from DateTimeFormatSpec to handle 
default time zone consistently.
-              DateTimeFormatSpec formatSpec = _config.getDateTimeFormatSpec();
-              Preconditions.checkNotNull(formatSpec, "DateTimeFormatSpec must 
exist for SimpleDate");
-              DateTimeFormatter dateTimeFormatter = 
formatSpec.getDateTimeFormatter();
-              startTime = dateTimeFormatter.parseMillis(startTimeStr);
-              endTime = dateTimeFormatter.parseMillis(endTimeStr);
-              timeUnit = TimeUnit.MILLISECONDS;
-            } else {
-              // by default, time column type is TimeColumnType.EPOCH
-              startTime = Long.parseLong(startTimeStr);
-              endTime = Long.parseLong(endTimeStr);
-              timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
-            }
+        try {
+          long startTime;
+          long endTime;
+          TimeUnit timeUnit;
+
+          // Use start/end time in config if defined
+          if (_config.getStartTime() != null) {
+            startTime = Long.parseLong(_config.getStartTime());
+            endTime = Long.parseLong(_config.getEndTime());
+            timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
           } else {
-            // No records in segment. Use current time as start/end
-            long now = System.currentTimeMillis();
-            if (_config.getTimeColumnType() == 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
-              startTime = now;
-              endTime = now;
-              timeUnit = TimeUnit.MILLISECONDS;
+            if (_totalDocs > 0) {
+              String startTimeStr = 
timeColumnIndexCreationInfo.getMin().toString();
+              String endTimeStr = 
timeColumnIndexCreationInfo.getMax().toString();
+
+              if (_config.getTimeColumnType() == 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+                // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value 
into millis since epoch
+                // Use DateTimeFormatter from DateTimeFormatSpec to handle 
default time zone consistently.
+                DateTimeFormatSpec formatSpec = 
_config.getDateTimeFormatSpec();
+                Preconditions.checkNotNull(formatSpec, "DateTimeFormatSpec 
must exist for SimpleDate");
+                DateTimeFormatter dateTimeFormatter = 
formatSpec.getDateTimeFormatter();
+                startTime = dateTimeFormatter.parseMillis(startTimeStr);
+                endTime = dateTimeFormatter.parseMillis(endTimeStr);
+                timeUnit = TimeUnit.MILLISECONDS;
+              } else {
+                // by default, time column type is TimeColumnType.EPOCH
+                startTime = Long.parseLong(startTimeStr);
+                endTime = Long.parseLong(endTimeStr);
+                timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+              }
             } else {
-              timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
-              startTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
-              endTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+              // No records in segment. Use current time as start/end
+              long now = System.currentTimeMillis();
+              if (_config.getTimeColumnType() == 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+                startTime = now;
+                endTime = now;
+                timeUnit = TimeUnit.MILLISECONDS;
+              } else {
+                timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+                startTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+                endTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+              }
             }
           }
-        }
 
-        if (!_config.isSkipTimeValueCheck()) {
-          Interval timeInterval =
-              new Interval(timeUnit.toMillis(startTime), 
timeUnit.toMillis(endTime), DateTimeZone.UTC);
-          Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
-              "Invalid segment start/end time: %s (in millis: %s/%s) for time 
column: %s, must be between: %s",
-              timeInterval, timeInterval.getStartMillis(), 
timeInterval.getEndMillis(), timeColumnName,
-              TimeUtils.VALID_TIME_INTERVAL);
-        }
+          if (!_config.isSkipTimeValueCheck()) {
+            Interval timeInterval =
+                new Interval(timeUnit.toMillis(startTime), 
timeUnit.toMillis(endTime), DateTimeZone.UTC);
+            
Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
+                "Invalid segment start/end time: %s (in millis: %s/%s) for 
time column: %s, must be between: %s",
+                timeInterval, timeInterval.getStartMillis(), 
timeInterval.getEndMillis(), timeColumnName,
+                TimeUtils.VALID_TIME_INTERVAL);
+          }
 
-        properties.setProperty(SEGMENT_START_TIME, startTime);
-        properties.setProperty(SEGMENT_END_TIME, endTime);
-        properties.setProperty(TIME_UNIT, timeUnit);
+          properties.setProperty(SEGMENT_START_TIME, startTime);
+          properties.setProperty(SEGMENT_END_TIME, endTime);
+          properties.setProperty(TIME_UNIT, timeUnit);
+        } catch (Exception e) {
+          if (!_config.isContinueOnError()) {
+            throw e;
+          }
+          TimeUnit timeUnit;
+          long now = System.currentTimeMillis();
+          long convertedStartTime;
+          long convertedEndTime;
+          if (_config.getTimeColumnType() == 
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+            convertedEndTime = now;
+            convertedStartTime = TimeUtils.getValidMinTimeMillis();
+            timeUnit = TimeUnit.MILLISECONDS;
+          } else {
+            timeUnit = 
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+            convertedEndTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+            convertedStartTime = 
timeUnit.convert(TimeUtils.getValidMinTimeMillis(), TimeUnit.MILLISECONDS);
+          }
+          LOGGER.warn(
+              "Caught exception while writing time metadata for segment: {}, 
time column: {}, total docs: {}. "
+                  + "Continuing using current time ({}) as the end time, and 
min valid time ({}) as the start time "
+                  + "for the segment (time unit: {}).",
+              _segmentName, timeColumnName, _totalDocs, convertedEndTime, 
convertedStartTime, timeUnit, e);
+          properties.setProperty(SEGMENT_START_TIME, convertedStartTime);
+          properties.setProperty(SEGMENT_END_TIME, convertedEndTime);
+          properties.setProperty(TIME_UNIT, timeUnit);
+        }
       }
     }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
index 14e53e3cc5c..2b8a59b5a7e 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
@@ -51,6 +51,7 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -270,4 +271,36 @@ public class ColumnMetadataTest {
           "Index size should be a non-negative integer value between 0 and 
281474976710655");
     }
   }
+
+  @Test
+  public void testBadTimeColumnWithoutContinueOnError()
+      throws Exception {
+    SegmentGeneratorConfig config = createSegmentConfigWithCreator();
+    // column4 is not a time column and should cause an exception to be thrown 
when the segment is sealed and time
+    // metadata is being parsed and written
+    config.setTimeColumnName("column4");
+    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
+    driver.init(config);
+    Assert.assertThrows(NumberFormatException.class, driver::build);
+  }
+
+  @Test
+  public void testBadTimeColumnWithContinueOnError()
+      throws Exception {
+    SegmentGeneratorConfig config = createSegmentConfigWithCreator();
+    // column4 is not a time column and should cause an exception to be thrown 
when the segment is sealed and time
+    // metadata is being parsed and written
+    config.setTimeColumnName("column4");
+    config.setContinueOnError(true);
+    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
+    driver.init(config);
+    driver.build();
+    SegmentMetadata segmentMetadata = new 
SegmentMetadataImpl(INDEX_DIR.listFiles()[0]);
+    // The time unit being used is hours since epoch.
+    long hoursSinceEpoch = System.currentTimeMillis() / 
TimeUnit.HOURS.toMillis(1);
+    // Use tolerance of 1 hour to eliminate any flakiness in the test due to 
time boundaries.
+    Assert.assertTrue(hoursSinceEpoch - segmentMetadata.getEndTime() <= 1);
+    Assert.assertEquals(segmentMetadata.getStartTime(),
+        TimeUnit.MILLISECONDS.toHours(TimeUtils.getValidMinTimeMillis()));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to