This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 78f38db365 Enhance commit threshold to accept size threshold without setting rows to 0 (#12684) 78f38db365 is described below commit 78f38db3653e4ac3ec96e52de7f7dafa96d708d0 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon Mar 25 16:27:10 2024 -0700 Enhance commit threshold to accept size threshold without setting rows to 0 (#12684) --- .../segment/FlushThresholdUpdateManager.java | 11 ++- .../segment/SegmentFlushThresholdComputer.java | 9 ++- .../segment/FlushThresholdUpdaterTest.java | 54 ++++++++------ .../realtime/RealtimeSegmentDataManager.java | 10 +-- .../core/realtime/stream/StreamConfigTest.java | 84 ++++++++++++++-------- .../segment/local/utils/TableConfigUtils.java | 25 ++++--- .../segment/local/utils/TableConfigUtilsTest.java | 55 +++++--------- .../org/apache/pinot/spi/stream/StreamConfig.java | 27 ++----- .../apache/pinot/spi/config/ConfigUtilsTest.java | 63 ++++++++-------- .../conf/sample_realtime_table_config.json | 2 +- .../meetupRsvp_realtime_table_config.json | 4 +- 11 files changed, 173 insertions(+), 171 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java index 318367e4d3..2d9dca1f3e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java @@ -38,14 +38,21 @@ public class FlushThresholdUpdateManager { */ public FlushThresholdUpdater getFlushThresholdUpdater(StreamConfig streamConfig) { String realtimeTableName = streamConfig.getTableNameWithType(); - int flushThresholdRows = streamConfig.getFlushThresholdRows(); + int flushThresholdRows = streamConfig.getFlushThresholdRows(); if (flushThresholdRows > 0) { _flushThresholdUpdaterMap.remove(realtimeTableName); return new DefaultFlushThresholdUpdater(flushThresholdRows); - } else { + } + + // Legacy behavior: when flush threshold rows is explicitly set to 0, use segment size based flush threshold + long flushThresholdSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes(); + if (flushThresholdRows == 0 || flushThresholdSegmentSizeBytes > 0) { return _flushThresholdUpdaterMap.computeIfAbsent(realtimeTableName, k -> new SegmentSizeBasedFlushThresholdUpdater()); + } else { + _flushThresholdUpdaterMap.remove(realtimeTableName); + return new DefaultFlushThresholdUpdater(StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java index 2c826dd60e..23b818d554 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java @@ -58,9 +58,12 @@ class SegmentFlushThresholdComputer { public int computeThreshold(StreamConfig streamConfig, CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata, String newSegmentName) { - final long desiredSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes(); - final long optimalSegmentSizeBytesMin = desiredSegmentSizeBytes / 2; - final double optimalSegmentSizeBytesMax = desiredSegmentSizeBytes * 1.5; + long desiredSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes(); + if (desiredSegmentSizeBytes <= 0) { + desiredSegmentSizeBytes = StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES; + } + long optimalSegmentSizeBytesMin = desiredSegmentSizeBytes / 2; + double optimalSegmentSizeBytesMax = desiredSegmentSizeBytes * 1.5; if (committingSegmentZKMetadata == null) { // first segment of the partition, hence committing segment is null if (_latestSegmentRowsToSizeRatio > 0) { // new partition group added case diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java index 1184f113d2..e7bfd92ec4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java @@ -53,49 +53,57 @@ public class FlushThresholdUpdaterTest { public void testFlushThresholdUpdateManager() { FlushThresholdUpdateManager flushThresholdUpdateManager = new FlushThresholdUpdateManager(); - // Flush threshold rows larger than 0 - DefaultFlushThresholdUpdater should be returned - FlushThresholdUpdater defaultFlushThresholdUpdater = flushThresholdUpdateManager - .getFlushThresholdUpdater(mockStreamConfig(StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS)); - assertTrue(defaultFlushThresholdUpdater instanceof DefaultFlushThresholdUpdater); - assertEquals(((DefaultFlushThresholdUpdater) defaultFlushThresholdUpdater).getTableFlushSize(), + // Neither flush threshold rows nor segment size is set - DefaultFlushThresholdUpdater should be returned + FlushThresholdUpdater flushThresholdUpdater = + flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1, -1)); + assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater); + assertEquals(((DefaultFlushThresholdUpdater) flushThresholdUpdater).getTableFlushSize(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS); + // Flush threshold rows larger than 0 - DefaultFlushThresholdUpdater should be returned + flushThresholdUpdater = flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(1234, -1)); + assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater); + assertEquals(((DefaultFlushThresholdUpdater) flushThresholdUpdater).getTableFlushSize(), 1234); + // Flush threshold rows set to 0 - SegmentSizeBasedFlushThresholdUpdater should be returned - StreamConfig autotuneStreamConfig = mockDefaultAutotuneStreamConfig(); - FlushThresholdUpdater autotuneFlushThresholdUpdater = - flushThresholdUpdateManager.getFlushThresholdUpdater(autotuneStreamConfig); - assertTrue(autotuneFlushThresholdUpdater instanceof SegmentSizeBasedFlushThresholdUpdater); + FlushThresholdUpdater segmentBasedflushThresholdUpdater = + flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1)); + assertTrue(segmentBasedflushThresholdUpdater instanceof SegmentSizeBasedFlushThresholdUpdater); - // Call again with flush threshold rows set to 0 - same Object should be returned - assertSame(flushThresholdUpdateManager.getFlushThresholdUpdater(mockAutotuneStreamConfig(10000L, 10000L, 10000)), - autotuneFlushThresholdUpdater); + // Flush threshold segment size larger than 0 - SegmentSizeBasedFlushThresholdUpdater should be returned + flushThresholdUpdater = flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1, 1234)); + assertSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater); - // Call again with flush threshold rows set larger than 0 - DefaultFlushThresholdUpdater should be returned - defaultFlushThresholdUpdater = flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(10000)); - assertTrue(defaultFlushThresholdUpdater instanceof DefaultFlushThresholdUpdater); - assertEquals(((DefaultFlushThresholdUpdater) defaultFlushThresholdUpdater).getTableFlushSize(), 10000); + // Flush threshold rows set larger than 0 - DefaultFlushThresholdUpdater should be returned + flushThresholdUpdater = flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(12345, -1)); + assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater); + assertEquals(((DefaultFlushThresholdUpdater) flushThresholdUpdater).getTableFlushSize(), 12345); // Call again with flush threshold rows set to 0 - a different Object should be returned - assertNotSame(flushThresholdUpdateManager.getFlushThresholdUpdater(autotuneStreamConfig), - autotuneFlushThresholdUpdater); + flushThresholdUpdater = flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1)); + assertTrue(flushThresholdUpdater instanceof SegmentSizeBasedFlushThresholdUpdater); + assertNotSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater); + segmentBasedflushThresholdUpdater = flushThresholdUpdater; // Clear the updater flushThresholdUpdateManager.clearFlushThresholdUpdater(REALTIME_TABLE_NAME); // Call again with flush threshold rows set to 0 - a different Object should be returned - assertNotSame(flushThresholdUpdateManager.getFlushThresholdUpdater(autotuneStreamConfig), - autotuneFlushThresholdUpdater); + flushThresholdUpdater = flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1)); + assertTrue(flushThresholdUpdater instanceof SegmentSizeBasedFlushThresholdUpdater); + assertNotSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater); } - private StreamConfig mockStreamConfig(int flushThresholdRows) { + private StreamConfig mockStreamConfig(int flushThresholdRows, long flushThresholdSegmentSize) { StreamConfig streamConfig = mock(StreamConfig.class); when(streamConfig.getTableNameWithType()).thenReturn(REALTIME_TABLE_NAME); when(streamConfig.getFlushThresholdRows()).thenReturn(flushThresholdRows); + when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(flushThresholdSegmentSize); return streamConfig; } - private StreamConfig mockAutotuneStreamConfig(long flushSegmentDesiredSizeBytes, - long flushThresholdTimeMillis, int flushAutotuneInitialRows) { + private StreamConfig mockAutotuneStreamConfig(long flushSegmentDesiredSizeBytes, long flushThresholdTimeMillis, + int flushAutotuneInitialRows) { StreamConfig streamConfig = mock(StreamConfig.class); when(streamConfig.getTableNameWithType()).thenReturn(REALTIME_TABLE_NAME); when(streamConfig.getFlushThresholdRows()).thenReturn(0); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 9f9a457683..e226a13502 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1455,10 +1455,12 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { } // Read the max number of rows - int segmentMaxRowCount = _streamConfig.getFlushThresholdRows(); - int flushThresholdSize = segmentZKMetadata.getSizeThresholdToFlushSegment(); - if (flushThresholdSize > 0) { - segmentMaxRowCount = flushThresholdSize; + int segmentMaxRowCount = segmentZKMetadata.getSizeThresholdToFlushSegment(); + if (segmentMaxRowCount <= 0) { + segmentMaxRowCount = _streamConfig.getFlushThresholdRows(); + } + if (segmentMaxRowCount <= 0) { + segmentMaxRowCount = StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS; } _segmentMaxRowCount = segmentMaxRowCount; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java index 11c7ee2010..ce3c6e3e74 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java @@ -144,10 +144,9 @@ public class StreamConfigTest { assertEquals(streamConfig.getOffsetCriteria(), new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest()); assertEquals(streamConfig.getConnectionTimeoutMillis(), StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS); assertEquals(streamConfig.getFetchTimeoutMillis(), StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS); - assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS); assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS); - assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), - StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES); + assertEquals(streamConfig.getFlushThresholdRows(), -1); + assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1); String offsetCriteria = "smallest"; String decoderProp1Key = "prop1"; @@ -167,8 +166,8 @@ public class StreamConfigTest { streamConfigMap.put( StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS), fetchTimeout); - streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, flushThresholdRows); streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, flushThresholdTime); + streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, flushThresholdRows); streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE, flushSegmentSize); streamConfig = new StreamConfig(tableName, streamConfigMap); @@ -181,9 +180,9 @@ public class StreamConfigTest { assertTrue(streamConfig.getOffsetCriteria().isSmallest()); assertEquals(streamConfig.getConnectionTimeoutMillis(), Long.parseLong(connectionTimeout)); assertEquals(streamConfig.getFetchTimeoutMillis(), Integer.parseInt(fetchTimeout)); - assertEquals(streamConfig.getFlushThresholdRows(), Integer.parseInt(flushThresholdRows)); assertEquals(streamConfig.getFlushThresholdTimeMillis(), (long) TimeUtils.convertPeriodToMillis(flushThresholdTime)); + assertEquals(streamConfig.getFlushThresholdRows(), Integer.parseInt(flushThresholdRows)); assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), DataSizeUtils.toBytes(flushSegmentSize)); // Backward compatibility check for flushThresholdTime @@ -191,10 +190,6 @@ public class StreamConfigTest { streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, flushThresholdTime); streamConfig = new StreamConfig(tableName, streamConfigMap); assertEquals(streamConfig.getFlushThresholdTimeMillis(), Long.parseLong(flushThresholdTime)); - flushThresholdTime = "invalid input"; - streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, flushThresholdTime); - streamConfig = new StreamConfig(tableName, streamConfigMap); - assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS); // Backward compatibility check for flush threshold rows streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS); @@ -247,38 +242,61 @@ public class StreamConfigTest { streamConfig = new StreamConfig(tableName, streamConfigMap); assertEquals(streamConfig.getConnectionTimeoutMillis(), StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS); - // Invalid flush threshold rows - deprecated property + // Invalid flush threshold time streamConfigMap.remove(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS)); + streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, "time"); + try { + new StreamConfig(tableName, streamConfigMap); + fail(); + } catch (IllegalArgumentException e) { + // Expected + assertTrue(e.getMessage().contains(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME)); + } + + // Invalid flush threshold rows - deprecated property + streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME); streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS, "rows"); - streamConfig = new StreamConfig(tableName, streamConfigMap); - assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS); + try { + new StreamConfig(tableName, streamConfigMap); + fail(); + } catch (IllegalArgumentException e) { + // Expected + assertTrue(e.getMessage().contains(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS)); + } // Invalid flush threshold rows - new property streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS); streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "rows"); - streamConfig = new StreamConfig(tableName, streamConfigMap); - assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS); - - // Invalid flush threshold time - streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS); - streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, "time"); - streamConfig = new StreamConfig(tableName, streamConfigMap); - assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS); + try { + new StreamConfig(tableName, streamConfigMap); + fail(); + } catch (IllegalArgumentException e) { + // Expected + assertTrue(e.getMessage().contains(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS)); + } // Invalid flush segment size - deprecated property - streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME); + streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS); streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE, "size"); - streamConfig = new StreamConfig(tableName, streamConfigMap); - assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), - StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES); + try { + new StreamConfig(tableName, streamConfigMap); + fail(); + } catch (IllegalArgumentException e) { + // Expected + assertTrue(e.getMessage().contains(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE)); + } // Invalid flush segment size - new property streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE); streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE, "size"); - streamConfig = new StreamConfig(tableName, streamConfigMap); - assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), - StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES); + try { + new StreamConfig(tableName, streamConfigMap); + fail(); + } catch (IllegalArgumentException e) { + // Expected + assertTrue(e.getMessage().contains(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE)); + } } /** @@ -309,16 +327,18 @@ public class StreamConfigTest { // Use default values if nothing provided streamConfig = new StreamConfig(tableName, streamConfigMap); - assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS); assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS); + assertEquals(streamConfig.getFlushThresholdRows(), -1); + assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1); // Use regular values if provided streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, flushThresholdRows); streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, flushThresholdTime); streamConfig = new StreamConfig(tableName, streamConfigMap); - assertEquals(streamConfig.getFlushThresholdRows(), Integer.parseInt(flushThresholdRows)); assertEquals(streamConfig.getFlushThresholdTimeMillis(), (long) TimeUtils.convertPeriodToMillis(flushThresholdTime)); + assertEquals(streamConfig.getFlushThresholdRows(), Integer.parseInt(flushThresholdRows)); + assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1); // Use regular values if both regular and llc config exists streamConfigMap.put( @@ -327,17 +347,19 @@ public class StreamConfigTest { streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME + StreamConfigProperties.LLC_SUFFIX, flushThresholdTimeLLC); streamConfig = new StreamConfig(tableName, streamConfigMap); - assertEquals(streamConfig.getFlushThresholdRows(), Integer.parseInt(flushThresholdRows)); assertEquals(streamConfig.getFlushThresholdTimeMillis(), (long) TimeUtils.convertPeriodToMillis(flushThresholdTime)); + assertEquals(streamConfig.getFlushThresholdRows(), Integer.parseInt(flushThresholdRows)); + assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1); // Use llc values if only llc config exists streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS); streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME); streamConfig = new StreamConfig(tableName, streamConfigMap); - assertEquals(streamConfig.getFlushThresholdRows(), Integer.parseInt(flushThresholdRowsLLC)); assertEquals(streamConfig.getFlushThresholdTimeMillis(), (long) TimeUtils.convertPeriodToMillis(flushThresholdTimeLLC)); + assertEquals(streamConfig.getFlushThresholdRows(), Integer.parseInt(flushThresholdRowsLLC)); + assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1); } @Test diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 6c83ae9cb8..a713c81492 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -155,26 +155,19 @@ public final class TableConfigUtils { if (!skipTypes.contains(ValidationType.ALL)) { validateTableSchemaConfig(tableConfig); validateValidationConfig(tableConfig, schema); - - StreamConfig streamConfig = null; validateIngestionConfig(tableConfig, schema, disableGroovy); + // Only allow realtime tables with non-null stream.type and LLC consumer.type if (tableConfig.getTableType() == TableType.REALTIME) { Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig); + StreamConfig streamConfig; try { // Validate that StreamConfig can be created streamConfig = new StreamConfig(tableConfig.getTableName(), streamConfigMap); } catch (Exception e) { throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e); } - validateDecoder(streamConfig); - // if segmentSizeBytes is specified, rows must be zero. - if (streamConfigMap.containsKey(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE) - || streamConfigMap.containsKey(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE)) { - Preconditions.checkState(streamConfig.getFlushThresholdRows() == 0, - String.format("Invalid config: %s=%d, it must be set to 0 for size based segment threshold to work.", - StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, streamConfig.getFlushThresholdRows())); - } + validateStreamConfig(streamConfig); } validateTierConfigList(tableConfig.getTierConfigsList()); validateIndexingConfig(tableConfig.getIndexingConfig(), schema); @@ -590,7 +583,17 @@ public final class TableConfigUtils { } @VisibleForTesting - static void validateDecoder(StreamConfig streamConfig) { + static void validateStreamConfig(StreamConfig streamConfig) { + // Validate flush threshold + Preconditions.checkState(streamConfig.getFlushThresholdTimeMillis() > 0, "Invalid flush threshold time: %s", + streamConfig.getFlushThresholdTimeMillis()); + int flushThresholdRows = streamConfig.getFlushThresholdRows(); + long flushThresholdSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes(); + Preconditions.checkState(!(flushThresholdRows > 0 && flushThresholdSegmentSizeBytes > 0), + "Flush threshold rows: %s and flush threshold segment size: %s cannot be both set", flushThresholdRows, + flushThresholdSegmentSizeBytes); + + // Validate decoder if (streamConfig.getDecoderClass().equals("org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder")) { String descriptorFilePath = "descriptorFile"; String protoClassName = "protoClassName"; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index bcf8aa622e..8f695f2b76 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -689,28 +689,31 @@ public class TableConfigUtilsTest { // validate the proto decoder streamConfigs = getKafkaStreamConfigs(); //test config should be valid - TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs)); + TableConfigUtils.validateStreamConfig(new StreamConfig("test", streamConfigs)); streamConfigs.remove("stream.kafka.decoder.prop.descriptorFile"); try { - TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs)); + TableConfigUtils.validateStreamConfig(new StreamConfig("test", streamConfigs)); + Assert.fail("Should fail without descriptor file"); } catch (IllegalStateException e) { // expected } streamConfigs = getKafkaStreamConfigs(); streamConfigs.remove("stream.kafka.decoder.prop.protoClassName"); try { - TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs)); + TableConfigUtils.validateStreamConfig(new StreamConfig("test", streamConfigs)); + Assert.fail("Should fail without descriptor proto class name"); } catch (IllegalStateException e) { // expected } //validate the protobuf pulsar config streamConfigs = getPulsarStreamConfigs(); //test config should be valid - TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs)); + TableConfigUtils.validateStreamConfig(new StreamConfig("test", streamConfigs)); //remove the descriptor file, should fail streamConfigs.remove("stream.pulsar.decoder.prop.descriptorFile"); try { - TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs)); + TableConfigUtils.validateStreamConfig(new StreamConfig("test", streamConfigs)); + Assert.fail("Should fail without descriptor file"); } catch (IllegalStateException e) { // expected } @@ -718,55 +721,31 @@ public class TableConfigUtilsTest { //remove the proto class name, should fail streamConfigs.remove("stream.pulsar.decoder.prop.protoClassName"); try { - TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs)); + TableConfigUtils.validateStreamConfig(new StreamConfig("test", streamConfigs)); + Assert.fail("Should fail without descriptor proto class name"); } catch (IllegalStateException e) { // expected } - Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) - .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build(); - - // When size based threshold is specified, default rows does not work, it has to be explicitly set to 0. + // When size based threshold is specified, default rows should not be set streamConfigs = getKafkaStreamConfigs(); streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE); streamConfigs.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE); streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE, "100m"); streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS); - ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs))); - tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN) - .setIngestionConfig(ingestionConfig).build(); - - try { - TableConfigUtils.validate(tableConfig, schema); - Assert.fail(); - } catch (IllegalStateException e) { - Assert.assertTrue(e.getMessage().contains("must be set to 0")); - } + TableConfigUtils.validateStreamConfig(new StreamConfig("test", streamConfigs)); - // When size based threshold is specified, rows has to be set to 0. streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "1000"); - ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs))); - tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); - try { - TableConfigUtils.validate(tableConfig, schema); - Assert.fail(); + TableConfigUtils.validateStreamConfig(new StreamConfig("test", streamConfigs)); + Assert.fail("Should fail when both rows and size based threshold are specified"); } catch (IllegalStateException e) { - Assert.assertTrue(e.getMessage().contains("must be set to 0")); + // expected } - // When size based threshold is specified, rows has to be set to 0. + // Legacy behavior: allow size based threshold to be explicitly set to 0 streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "0"); - ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs))); - tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); - - try { - TableConfigUtils.validate(tableConfig, schema); - } catch (IllegalStateException e) { - Assert.fail(e.getMessage()); - } + TableConfigUtils.validateStreamConfig(new StreamConfig("test", streamConfigs)); } @Test diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java index ea24f5d01b..32bea7aa83 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java @@ -222,7 +222,6 @@ public class StreamConfig { } private long extractFlushThresholdSegmentSize(Map<String, String> streamConfigMap) { - long segmentSizeBytes = -1; String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE; String flushThresholdSegmentSizeStr = streamConfigMap.get(key); if (flushThresholdSegmentSizeStr == null) { @@ -230,19 +229,14 @@ public class StreamConfig { key = StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE; flushThresholdSegmentSizeStr = streamConfigMap.get(key); } - if (flushThresholdSegmentSizeStr != null) { try { - segmentSizeBytes = DataSizeUtils.toBytes(flushThresholdSegmentSizeStr); + return DataSizeUtils.toBytes(flushThresholdSegmentSizeStr); } catch (Exception e) { - LOGGER.warn("Invalid config {}: {}, defaulting to: {}", key, flushThresholdSegmentSizeStr, - DataSizeUtils.fromBytes(DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES)); + throw new IllegalArgumentException("Invalid config " + key + ": " + flushThresholdSegmentSizeStr); } - } - if (segmentSizeBytes > 0) { - return segmentSizeBytes; } else { - return DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES; + return -1; } } @@ -261,17 +255,12 @@ public class StreamConfig { } if (flushThresholdRowsStr != null) { try { - int flushThresholdRows = Integer.parseInt(flushThresholdRowsStr); - // Flush threshold rows 0 means using segment size based flush threshold - Preconditions.checkState(flushThresholdRows >= 0); - return flushThresholdRows; + return Integer.parseInt(flushThresholdRowsStr); } catch (Exception e) { - LOGGER.warn("Invalid config {}: {}, defaulting to: {}", key, flushThresholdRowsStr, - DEFAULT_FLUSH_THRESHOLD_ROWS); - return DEFAULT_FLUSH_THRESHOLD_ROWS; + throw new IllegalArgumentException("Invalid config " + key + ": " + flushThresholdRowsStr); } } else { - return DEFAULT_FLUSH_THRESHOLD_ROWS; + return -1; } } @@ -291,9 +280,7 @@ public class StreamConfig { // For backward-compatibility, parse it as milliseconds value return Long.parseLong(flushThresholdTimeStr); } catch (NumberFormatException nfe) { - LOGGER.warn("Invalid config {}: {}, defaulting to: {}", key, flushThresholdTimeStr, - DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS); - return DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS; + throw new IllegalArgumentException("Invalid config " + key + ": " + flushThresholdTimeStr); } } } else { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java index 1b1d414249..eeee659453 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java @@ -28,10 +28,10 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; -import org.testng.Assert; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -68,19 +68,17 @@ public class ConfigUtilsTest { Map<String, String> streamConfigMap = new HashMap<>(); streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType); - streamConfigMap - .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), - topic); - streamConfigMap.put(StreamConfigProperties - .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), - consumerFactoryClass); - streamConfigMap - .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS), - decoderClass); - streamConfigMap - .put(StreamConfigProperties.constructStreamProperty(streamType, "aws.accessKey"), "${AWS_ACCESS_KEY}"); - streamConfigMap - .put(StreamConfigProperties.constructStreamProperty(streamType, "aws.secretKey"), "${AWS_SECRET_KEY}"); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic); + streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType, + StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass); + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS), + decoderClass); + streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType, "aws.accessKey"), + "${AWS_ACCESS_KEY}"); + streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType, "aws.secretKey"), + "${AWS_SECRET_KEY}"); indexingConfig.setStreamConfigs(streamConfigMap); Map<String, String> environment = @@ -99,24 +97,19 @@ public class ConfigUtilsTest { // Mandatory values + defaults StreamConfig streamConfig = new StreamConfig(tableName, indexingConfig.getStreamConfigs()); - Assert.assertEquals(streamConfig.getType(), streamType); - Assert.assertEquals(streamConfig.getTopicName(), topic); - Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), defaultConsumerFactoryClass); - Assert.assertEquals(streamConfig.getDecoderClass(), defaultDecoderClass); - Assert.assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.accessKey"), - "default_aws_access_key"); - Assert.assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.secretKey"), - "default_aws_secret_key"); - Assert.assertEquals(streamConfig.getDecoderProperties().size(), 0); - Assert - .assertEquals(streamConfig.getOffsetCriteria(), new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest()); - Assert - .assertEquals(streamConfig.getConnectionTimeoutMillis(), StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS); - Assert.assertEquals(streamConfig.getFetchTimeoutMillis(), StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS); - Assert.assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS); - Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS); - Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), - StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES); + assertEquals(streamConfig.getType(), streamType); + assertEquals(streamConfig.getTopicName(), topic); + assertEquals(streamConfig.getConsumerFactoryClassName(), defaultConsumerFactoryClass); + assertEquals(streamConfig.getDecoderClass(), defaultDecoderClass); + assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.accessKey"), "default_aws_access_key"); + assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.secretKey"), "default_aws_secret_key"); + assertEquals(streamConfig.getDecoderProperties().size(), 0); + assertEquals(streamConfig.getOffsetCriteria(), new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest()); + assertEquals(streamConfig.getConnectionTimeoutMillis(), StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS); + assertEquals(streamConfig.getFetchTimeoutMillis(), StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS); + assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS); + assertEquals(streamConfig.getFlushThresholdRows(), -1); + assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1); } @Test @@ -132,8 +125,8 @@ public class ConfigUtilsTest { PinotConfiguration config = new PinotConfiguration(nestedMap); String configString = config.toString(); - Assert.assertTrue(configString.contains("credentials")); - Assert.assertFalse(configString.contains("verysecret")); - Assert.assertFalse(configString.contains("secrettoken")); + assertTrue(configString.contains("credentials")); + assertFalse(configString.contains("verysecret")); + assertFalse(configString.contains("secrettoken")); } } diff --git a/pinot-tools/src/main/resources/conf/sample_realtime_table_config.json b/pinot-tools/src/main/resources/conf/sample_realtime_table_config.json index 51d1b25eac..518da612ac 100644 --- a/pinot-tools/src/main/resources/conf/sample_realtime_table_config.json +++ b/pinot-tools/src/main/resources/conf/sample_realtime_table_config.json @@ -32,7 +32,7 @@ "stream.kafka.consumer.prop.auto.offset.reset": "largest", "stream.kafka.broker.list": "localhost:19092", "realtime.segment.flush.threshold.time": "12h", - "realtime.segment.flush.threshold.size": "100M" + "realtime.segment.flush.threshold.size": "100000" } }, "metadata": { diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json index 8cdbcf7bf3..f911ed6558 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json @@ -20,9 +20,7 @@ "stream.kafka.consumer.prop.auto.offset.reset": "largest", "stream.kafka.zk.broker.url": "localhost:2191/kafka", "stream.kafka.broker.list": "localhost:19092", - "stream.kafka.metadata.populate": "true", - "realtime.segment.flush.threshold.time": "12h", - "realtime.segment.flush.threshold.size": "10K" + "stream.kafka.metadata.populate": "true" } ] } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org