This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0cd787f9f15 Gracefully handle bad segment time data during ingestion
if continueOnError is true (#16462)
0cd787f9f15 is described below
commit 0cd787f9f15b7510c62792467f63699620370d66
Author: Yash Mayya <[email protected]>
AuthorDate: Fri Aug 22 10:41:13 2025 +0530
Gracefully handle bad segment time data during ingestion if continueOnError
is true (#16462)
---
.../creator/impl/SegmentColumnarIndexCreator.java | 127 +++++++++++++--------
.../local/segment/index/ColumnMetadataTest.java | 33 ++++++
2 files changed, 110 insertions(+), 50 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 412390d2067..faa777128e8 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -498,62 +498,89 @@ public class SegmentColumnarIndexCreator implements
SegmentCreator {
if (timeColumnName != null) {
ColumnIndexCreationInfo timeColumnIndexCreationInfo =
_indexCreationInfoMap.get(timeColumnName);
if (timeColumnIndexCreationInfo != null) {
- long startTime;
- long endTime;
- TimeUnit timeUnit;
-
- // Use start/end time in config if defined
- if (_config.getStartTime() != null) {
- startTime = Long.parseLong(_config.getStartTime());
- endTime = Long.parseLong(_config.getEndTime());
- timeUnit = Preconditions.checkNotNull(_config.getSegmentTimeUnit());
- } else {
- if (_totalDocs > 0) {
- String startTimeStr =
timeColumnIndexCreationInfo.getMin().toString();
- String endTimeStr =
timeColumnIndexCreationInfo.getMax().toString();
-
- if (_config.getTimeColumnType() ==
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
- // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value
into millis since epoch
- // Use DateTimeFormatter from DateTimeFormatSpec to handle
default time zone consistently.
- DateTimeFormatSpec formatSpec = _config.getDateTimeFormatSpec();
- Preconditions.checkNotNull(formatSpec, "DateTimeFormatSpec must
exist for SimpleDate");
- DateTimeFormatter dateTimeFormatter =
formatSpec.getDateTimeFormatter();
- startTime = dateTimeFormatter.parseMillis(startTimeStr);
- endTime = dateTimeFormatter.parseMillis(endTimeStr);
- timeUnit = TimeUnit.MILLISECONDS;
- } else {
- // by default, time column type is TimeColumnType.EPOCH
- startTime = Long.parseLong(startTimeStr);
- endTime = Long.parseLong(endTimeStr);
- timeUnit =
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
- }
+ try {
+ long startTime;
+ long endTime;
+ TimeUnit timeUnit;
+
+ // Use start/end time in config if defined
+ if (_config.getStartTime() != null) {
+ startTime = Long.parseLong(_config.getStartTime());
+ endTime = Long.parseLong(_config.getEndTime());
+ timeUnit =
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
} else {
- // No records in segment. Use current time as start/end
- long now = System.currentTimeMillis();
- if (_config.getTimeColumnType() ==
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
- startTime = now;
- endTime = now;
- timeUnit = TimeUnit.MILLISECONDS;
+ if (_totalDocs > 0) {
+ String startTimeStr =
timeColumnIndexCreationInfo.getMin().toString();
+ String endTimeStr =
timeColumnIndexCreationInfo.getMax().toString();
+
+ if (_config.getTimeColumnType() ==
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+ // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value
into millis since epoch
+ // Use DateTimeFormatter from DateTimeFormatSpec to handle
default time zone consistently.
+ DateTimeFormatSpec formatSpec =
_config.getDateTimeFormatSpec();
+ Preconditions.checkNotNull(formatSpec, "DateTimeFormatSpec
must exist for SimpleDate");
+ DateTimeFormatter dateTimeFormatter =
formatSpec.getDateTimeFormatter();
+ startTime = dateTimeFormatter.parseMillis(startTimeStr);
+ endTime = dateTimeFormatter.parseMillis(endTimeStr);
+ timeUnit = TimeUnit.MILLISECONDS;
+ } else {
+ // by default, time column type is TimeColumnType.EPOCH
+ startTime = Long.parseLong(startTimeStr);
+ endTime = Long.parseLong(endTimeStr);
+ timeUnit =
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+ }
} else {
- timeUnit =
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
- startTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
- endTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+ // No records in segment. Use current time as start/end
+ long now = System.currentTimeMillis();
+ if (_config.getTimeColumnType() ==
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+ startTime = now;
+ endTime = now;
+ timeUnit = TimeUnit.MILLISECONDS;
+ } else {
+ timeUnit =
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+ startTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+ endTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+ }
}
}
- }
- if (!_config.isSkipTimeValueCheck()) {
- Interval timeInterval =
- new Interval(timeUnit.toMillis(startTime),
timeUnit.toMillis(endTime), DateTimeZone.UTC);
- Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
- "Invalid segment start/end time: %s (in millis: %s/%s) for time
column: %s, must be between: %s",
- timeInterval, timeInterval.getStartMillis(),
timeInterval.getEndMillis(), timeColumnName,
- TimeUtils.VALID_TIME_INTERVAL);
- }
+ if (!_config.isSkipTimeValueCheck()) {
+ Interval timeInterval =
+ new Interval(timeUnit.toMillis(startTime),
timeUnit.toMillis(endTime), DateTimeZone.UTC);
+
Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
+ "Invalid segment start/end time: %s (in millis: %s/%s) for
time column: %s, must be between: %s",
+ timeInterval, timeInterval.getStartMillis(),
timeInterval.getEndMillis(), timeColumnName,
+ TimeUtils.VALID_TIME_INTERVAL);
+ }
- properties.setProperty(SEGMENT_START_TIME, startTime);
- properties.setProperty(SEGMENT_END_TIME, endTime);
- properties.setProperty(TIME_UNIT, timeUnit);
+ properties.setProperty(SEGMENT_START_TIME, startTime);
+ properties.setProperty(SEGMENT_END_TIME, endTime);
+ properties.setProperty(TIME_UNIT, timeUnit);
+ } catch (Exception e) {
+ if (!_config.isContinueOnError()) {
+ throw e;
+ }
+ TimeUnit timeUnit;
+ long now = System.currentTimeMillis();
+ long convertedStartTime;
+ long convertedEndTime;
+ if (_config.getTimeColumnType() ==
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+ convertedEndTime = now;
+ convertedStartTime = TimeUtils.getValidMinTimeMillis();
+ timeUnit = TimeUnit.MILLISECONDS;
+ } else {
+ timeUnit =
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
+ convertedEndTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+ convertedStartTime =
timeUnit.convert(TimeUtils.getValidMinTimeMillis(), TimeUnit.MILLISECONDS);
+ }
+ LOGGER.warn(
+ "Caught exception while writing time metadata for segment: {},
time column: {}, total docs: {}. "
+ + "Continuing using current time ({}) as the end time, and
min valid time ({}) as the start time "
+ + "for the segment (time unit: {}).",
+ _segmentName, timeColumnName, _totalDocs, convertedEndTime,
convertedStartTime, timeUnit, e);
+ properties.setProperty(SEGMENT_START_TIME, convertedStartTime);
+ properties.setProperty(SEGMENT_END_TIME, convertedEndTime);
+ properties.setProperty(TIME_UNIT, timeUnit);
+ }
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
index 14e53e3cc5c..2b8a59b5a7e 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/ColumnMetadataTest.java
@@ -51,6 +51,7 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.env.CommonsConfigurationUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -270,4 +271,36 @@ public class ColumnMetadataTest {
"Index size should be a non-negative integer value between 0 and
281474976710655");
}
}
+
+ @Test
+ public void testBadTimeColumnWithoutContinueOnError()
+ throws Exception {
+ SegmentGeneratorConfig config = createSegmentConfigWithCreator();
+ // column4 is not a time column and should cause an exception to be thrown
when the segment is sealed and time
+ // metadata is being parsed and written
+ config.setTimeColumnName("column4");
+ SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
+ driver.init(config);
+ Assert.assertThrows(NumberFormatException.class, driver::build);
+ }
+
+ @Test
+ public void testBadTimeColumnWithContinueOnError()
+ throws Exception {
+ SegmentGeneratorConfig config = createSegmentConfigWithCreator();
+ // column4 is not a time column and should cause an exception to be thrown
when the segment is sealed and time
+ // metadata is being parsed and written
+ config.setTimeColumnName("column4");
+ config.setContinueOnError(true);
+ SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
+ driver.init(config);
+ driver.build();
+ SegmentMetadata segmentMetadata = new
SegmentMetadataImpl(INDEX_DIR.listFiles()[0]);
+ // The time unit being used is hours since epoch.
+ long hoursSinceEpoch = System.currentTimeMillis() /
TimeUnit.HOURS.toMillis(1);
+ // Use tolerance of 1 hour to eliminate any flakiness in the test due to
time boundaries.
+ Assert.assertTrue(hoursSinceEpoch - segmentMetadata.getEndTime() <= 1);
+ Assert.assertEquals(segmentMetadata.getStartTime(),
+ TimeUnit.MILLISECONDS.toHours(TimeUtils.getValidMinTimeMillis()));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]