This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch fix_time_conversion in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit e5389c15ef27d66cd058e8b65e0a0931a204cc0b Author: Xiaotian (Jackie) Jiang <[email protected]> AuthorDate: Tue Nov 13 17:55:51 2018 -0800 Fix the bug where realtime time conversion is skipped when incoming and outgoing time name are the same 1. Always perform time transformation for REALTIME segment consumption 2. Always not perform time transformation for OFFLINE segment generation - The reason to disable time trasformation for OFFLINE is that, with the same incoming and outgoing time name, there is no way to determine whether the time conversion already happens. OFFLINE segment might be pre-aggregated and the time might have already been converted. If we need time transformation for OFFLINE segment generation in the future, we can add an extra flag for that. --- .../realtime/HLRealtimeSegmentDataManager.java | 4 +-- .../realtime/LLRealtimeSegmentDataManager.java | 2 +- .../recordtransformer/CompoundTransformer.java | 24 +++++++++++-- .../data/recordtransformer/TimeTransformer.java | 6 +--- .../RecordReaderSegmentCreationDataSource.java | 3 +- .../impl/SegmentIndexCreationDriverImpl.java | 2 +- .../recordtransformer/RecordTransformerTest.java | 42 ++++++++++++++++++++-- 7 files changed, 67 insertions(+), 16 deletions(-) diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java index 25f3858..439a451 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java @@ -102,8 +102,8 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { super(); _segmentVersion = indexLoadingConfig.getSegmentVersion(); this.schema = schema; - _recordTransformer = CompoundTransformer.getDefaultTransformer(schema); - this.serverMetrics =serverMetrics; + _recordTransformer = CompoundTransformer.getRealtimeTransformer(schema); + this.serverMetrics = serverMetrics; this.segmentName = realtimeSegmentZKMetadata.getSegmentName(); this.tableName = tableConfig.getTableName(); this.timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index ec19064..202595f 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1061,7 +1061,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _clientId = _streamPartitionId + "-" + NetUtil.getHostnameOrAddress(); // Create record transformer - _recordTransformer = CompoundTransformer.getDefaultTransformer(schema); + _recordTransformer = CompoundTransformer.getRealtimeTransformer(schema); makeStreamConsumer("Starting"); makeStreamMetadataProvider("Starting"); diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/CompoundTransformer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/CompoundTransformer.java index 1008a45..5e9b83e 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/CompoundTransformer.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/CompoundTransformer.java @@ -30,12 +30,30 @@ public class CompoundTransformer implements RecordTransformer { private final List<RecordTransformer> _transformers; /** - * Returns a record transformer that performs time transform, expressions transform and data type transform. + * Returns a record transformer for OFFLINE segment generation that performs expressions transform and data type + * transform. + * <p>NOTE: NO TIME TRANSFORMATION + * <p>NOTE: DO NOT CHANGE THE ORDER OF THE RECORD TRANSFORMERS + * <ul> + * <li> + * We put {@link SanitationTransformer} after {@link DataTypeTransformer} so that before sanitation, all values + * follow the data types defined in the {@link Schema}. + * </li> + * </ul> + */ + public static CompoundTransformer getOfflineTransformer(Schema schema) { + return new CompoundTransformer(Arrays.asList(new ExpressionTransformer(schema), new DataTypeTransformer(schema), + new SanitationTransformer(schema))); + } + + /** + * Returns a record transformer for REALTIME segment consumption that performs time transform, expressions transform + * and data type transform. * <p>NOTE: DO NOT CHANGE THE ORDER OF THE RECORD TRANSFORMERS * <ul> * <li> * We put {@link ExpressionTransformer} after {@link TimeTransformer} so that expression can work on outgoing time - * column + * column. * </li> * <li> * We put {@link SanitationTransformer} after {@link DataTypeTransformer} so that before sanitation, all values @@ -43,7 +61,7 @@ public class CompoundTransformer implements RecordTransformer { * </li> * </ul> */ - public static CompoundTransformer getDefaultTransformer(Schema schema) { + public static CompoundTransformer getRealtimeTransformer(Schema schema) { return new CompoundTransformer( Arrays.asList(new TimeTransformer(schema), new ExpressionTransformer(schema), new DataTypeTransformer(schema), new SanitationTransformer(schema))); diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java index 2ee8e00..8d403d3 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java @@ -55,11 +55,7 @@ public class TimeTransformer implements RecordTransformer { if (_timeConverter == null) { return record; } - // Skip transformation if outgoing value already exist - // NOTE: outgoing value might already exist for OFFLINE data - if (record.getValue(_outgoingTimeColumn) == null) { - record.putField(_outgoingTimeColumn, _timeConverter.convert(record.getValue(_incomingTimeColumn))); - } + record.putField(_outgoingTimeColumn, _timeConverter.convert(record.getValue(_incomingTimeColumn))); return record; } } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java index c5778b1..6d0f976 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java @@ -42,8 +42,7 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat @Override public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsCollectorConfig) { try { - RecordTransformer recordTransformer = - CompoundTransformer.getDefaultTransformer(statsCollectorConfig.getSchema()); + RecordTransformer recordTransformer = CompoundTransformer.getOfflineTransformer(statsCollectorConfig.getSchema()); SegmentPreIndexStatsCollector collector = new SegmentPreIndexStatsCollectorImpl(statsCollectorConfig); collector.init(); diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java index 62f96dd..e104a1c 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -107,7 +107,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive } public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource) { - init(config, dataSource, CompoundTransformer.getDefaultTransformer(dataSource.getRecordReader().getSchema())); + init(config, dataSource, CompoundTransformer.getOfflineTransformer(dataSource.getRecordReader().getSchema())); } public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource, diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java index fb5f0fc..0590aae 100644 --- a/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java +++ b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java @@ -86,6 +86,20 @@ public class RecordTransformerTest { } @Test + public void testTimeTransformerWithSameIncomingOutgoingColumnName() { + Schema schema = new Schema.SchemaBuilder().addTime("time", 6, TimeUnit.HOURS, FieldSpec.DataType.INT, "time", 1, + TimeUnit.MILLISECONDS, FieldSpec.DataType.LONG).build(); + RecordTransformer transformer = new TimeTransformer(schema); + GenericRow record = new GenericRow(); + record.putField("time", 123); + + // With the same incoming and outgoing column name, transform multiple times will return different results + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("time"), 123 * 6 * 3600 * 1000L); + } + + @Test public void testDataTypeTransformer() { RecordTransformer transformer = new DataTypeTransformer(SCHEMA); GenericRow record = getRecord(); @@ -124,8 +138,32 @@ public class RecordTransformerTest { } @Test - public void testDefaultTransformer() { - RecordTransformer transformer = CompoundTransformer.getDefaultTransformer(SCHEMA); + public void testOfflineTransformer() { + RecordTransformer transformer = CompoundTransformer.getOfflineTransformer(SCHEMA); + GenericRow record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svInt"), 123); + assertEquals(record.getValue("svLong"), 123L); + assertEquals(record.getValue("svFloat"), 123f); + assertEquals(record.getValue("svDouble"), 123d); + assertEquals(record.getValue("svBytes"), new byte[]{123, 123}); + assertEquals(record.getValue("mvInt"), new Object[]{123}); + assertEquals(record.getValue("mvLong"), new Object[]{123L}); + assertEquals(record.getValue("mvFloat"), new Object[]{123f}); + assertEquals(record.getValue("mvDouble"), new Object[]{123d}); + assertEquals(record.getValue("svStringWithNullCharacters"), "1"); + assertEquals(record.getValue("svStringWithLengthLimit"), "12"); + assertEquals(record.getValue("incoming"), "123"); + // No time conversion for OFFLINE transformer + assertEquals(record.getValue("outgoing"), Long.MIN_VALUE); + } + } + + @Test + public void testRealtimeTransformer() { + RecordTransformer transformer = CompoundTransformer.getRealtimeTransformer(SCHEMA); GenericRow record = getRecord(); for (int i = 0; i < NUM_ROUNDS; i++) { record = transformer.transform(record); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
