This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 78f38db365 Enhance commit threshold to accept size threshold without 
setting rows to 0 (#12684)
78f38db365 is described below

commit 78f38db3653e4ac3ec96e52de7f7dafa96d708d0
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon Mar 25 16:27:10 2024 -0700

    Enhance commit threshold to accept size threshold without setting rows to 0 
(#12684)
---
 .../segment/FlushThresholdUpdateManager.java       | 11 ++-
 .../segment/SegmentFlushThresholdComputer.java     |  9 ++-
 .../segment/FlushThresholdUpdaterTest.java         | 54 ++++++++------
 .../realtime/RealtimeSegmentDataManager.java       | 10 +--
 .../core/realtime/stream/StreamConfigTest.java     | 84 ++++++++++++++--------
 .../segment/local/utils/TableConfigUtils.java      | 25 ++++---
 .../segment/local/utils/TableConfigUtilsTest.java  | 55 +++++---------
 .../org/apache/pinot/spi/stream/StreamConfig.java  | 27 ++-----
 .../apache/pinot/spi/config/ConfigUtilsTest.java   | 63 ++++++++--------
 .../conf/sample_realtime_table_config.json         |  2 +-
 .../meetupRsvp_realtime_table_config.json          |  4 +-
 11 files changed, 173 insertions(+), 171 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
index 318367e4d3..2d9dca1f3e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
@@ -38,14 +38,21 @@ public class FlushThresholdUpdateManager {
    */
   public FlushThresholdUpdater getFlushThresholdUpdater(StreamConfig 
streamConfig) {
     String realtimeTableName = streamConfig.getTableNameWithType();
-    int flushThresholdRows = streamConfig.getFlushThresholdRows();
 
+    int flushThresholdRows = streamConfig.getFlushThresholdRows();
     if (flushThresholdRows > 0) {
       _flushThresholdUpdaterMap.remove(realtimeTableName);
       return new DefaultFlushThresholdUpdater(flushThresholdRows);
-    } else {
+    }
+
+    // Legacy behavior: when flush threshold rows is explicitly set to 0, use 
segment size based flush threshold
+    long flushThresholdSegmentSizeBytes = 
streamConfig.getFlushThresholdSegmentSizeBytes();
+    if (flushThresholdRows == 0 || flushThresholdSegmentSizeBytes > 0) {
       return _flushThresholdUpdaterMap.computeIfAbsent(realtimeTableName,
           k -> new SegmentSizeBasedFlushThresholdUpdater());
+    } else {
+      _flushThresholdUpdaterMap.remove(realtimeTableName);
+      return new 
DefaultFlushThresholdUpdater(StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
     }
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
index 2c826dd60e..23b818d554 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
@@ -58,9 +58,12 @@ class SegmentFlushThresholdComputer {
 
   public int computeThreshold(StreamConfig streamConfig, 
CommittingSegmentDescriptor committingSegmentDescriptor,
       @Nullable SegmentZKMetadata committingSegmentZKMetadata, String 
newSegmentName) {
-    final long desiredSegmentSizeBytes = 
streamConfig.getFlushThresholdSegmentSizeBytes();
-    final long optimalSegmentSizeBytesMin = desiredSegmentSizeBytes / 2;
-    final double optimalSegmentSizeBytesMax = desiredSegmentSizeBytes * 1.5;
+    long desiredSegmentSizeBytes = 
streamConfig.getFlushThresholdSegmentSizeBytes();
+    if (desiredSegmentSizeBytes <= 0) {
+      desiredSegmentSizeBytes = 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES;
+    }
+    long optimalSegmentSizeBytesMin = desiredSegmentSizeBytes / 2;
+    double optimalSegmentSizeBytesMax = desiredSegmentSizeBytes * 1.5;
 
     if (committingSegmentZKMetadata == null) { // first segment of the 
partition, hence committing segment is null
       if (_latestSegmentRowsToSizeRatio > 0) { // new partition group added 
case
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index 1184f113d2..e7bfd92ec4 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -53,49 +53,57 @@ public class FlushThresholdUpdaterTest {
   public void testFlushThresholdUpdateManager() {
     FlushThresholdUpdateManager flushThresholdUpdateManager = new 
FlushThresholdUpdateManager();
 
-    // Flush threshold rows larger than 0 - DefaultFlushThresholdUpdater 
should be returned
-    FlushThresholdUpdater defaultFlushThresholdUpdater = 
flushThresholdUpdateManager
-        
.getFlushThresholdUpdater(mockStreamConfig(StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS));
-    assertTrue(defaultFlushThresholdUpdater instanceof 
DefaultFlushThresholdUpdater);
-    assertEquals(((DefaultFlushThresholdUpdater) 
defaultFlushThresholdUpdater).getTableFlushSize(),
+    // Neither flush threshold rows nor segment size is set - 
DefaultFlushThresholdUpdater should be returned
+    FlushThresholdUpdater flushThresholdUpdater =
+        
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1, -1));
+    assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater);
+    assertEquals(((DefaultFlushThresholdUpdater) 
flushThresholdUpdater).getTableFlushSize(),
         StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
 
+    // Flush threshold rows larger than 0 - DefaultFlushThresholdUpdater 
should be returned
+    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(1234, 
-1));
+    assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater);
+    assertEquals(((DefaultFlushThresholdUpdater) 
flushThresholdUpdater).getTableFlushSize(), 1234);
+
     // Flush threshold rows set to 0 - SegmentSizeBasedFlushThresholdUpdater 
should be returned
-    StreamConfig autotuneStreamConfig = mockDefaultAutotuneStreamConfig();
-    FlushThresholdUpdater autotuneFlushThresholdUpdater =
-        
flushThresholdUpdateManager.getFlushThresholdUpdater(autotuneStreamConfig);
-    assertTrue(autotuneFlushThresholdUpdater instanceof 
SegmentSizeBasedFlushThresholdUpdater);
+    FlushThresholdUpdater segmentBasedflushThresholdUpdater =
+        
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1));
+    assertTrue(segmentBasedflushThresholdUpdater instanceof 
SegmentSizeBasedFlushThresholdUpdater);
 
-    // Call again with flush threshold rows set to 0 - same Object should be 
returned
-    
assertSame(flushThresholdUpdateManager.getFlushThresholdUpdater(mockAutotuneStreamConfig(10000L,
 10000L, 10000)),
-        autotuneFlushThresholdUpdater);
+    // Flush threshold segment size larger than 0 - 
SegmentSizeBasedFlushThresholdUpdater should be returned
+    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(-1, 
1234));
+    assertSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater);
 
-    // Call again with flush threshold rows set larger than 0 - 
DefaultFlushThresholdUpdater should be returned
-    defaultFlushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(10000));
-    assertTrue(defaultFlushThresholdUpdater instanceof 
DefaultFlushThresholdUpdater);
-    assertEquals(((DefaultFlushThresholdUpdater) 
defaultFlushThresholdUpdater).getTableFlushSize(), 10000);
+    // Flush threshold rows set larger than 0 - DefaultFlushThresholdUpdater 
should be returned
+    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(12345, 
-1));
+    assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater);
+    assertEquals(((DefaultFlushThresholdUpdater) 
flushThresholdUpdater).getTableFlushSize(), 12345);
 
     // Call again with flush threshold rows set to 0 - a different Object 
should be returned
-    
assertNotSame(flushThresholdUpdateManager.getFlushThresholdUpdater(autotuneStreamConfig),
-        autotuneFlushThresholdUpdater);
+    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1));
+    assertTrue(flushThresholdUpdater instanceof 
SegmentSizeBasedFlushThresholdUpdater);
+    assertNotSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater);
+    segmentBasedflushThresholdUpdater = flushThresholdUpdater;
 
     // Clear the updater
     
flushThresholdUpdateManager.clearFlushThresholdUpdater(REALTIME_TABLE_NAME);
 
     // Call again with flush threshold rows set to 0 - a different Object 
should be returned
-    
assertNotSame(flushThresholdUpdateManager.getFlushThresholdUpdater(autotuneStreamConfig),
-        autotuneFlushThresholdUpdater);
+    flushThresholdUpdater = 
flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(0, -1));
+    assertTrue(flushThresholdUpdater instanceof 
SegmentSizeBasedFlushThresholdUpdater);
+    assertNotSame(flushThresholdUpdater, segmentBasedflushThresholdUpdater);
   }
 
-  private StreamConfig mockStreamConfig(int flushThresholdRows) {
+  private StreamConfig mockStreamConfig(int flushThresholdRows, long 
flushThresholdSegmentSize) {
     StreamConfig streamConfig = mock(StreamConfig.class);
     when(streamConfig.getTableNameWithType()).thenReturn(REALTIME_TABLE_NAME);
     when(streamConfig.getFlushThresholdRows()).thenReturn(flushThresholdRows);
+    
when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(flushThresholdSegmentSize);
     return streamConfig;
   }
 
-  private StreamConfig mockAutotuneStreamConfig(long 
flushSegmentDesiredSizeBytes,
-      long flushThresholdTimeMillis, int flushAutotuneInitialRows) {
+  private StreamConfig mockAutotuneStreamConfig(long 
flushSegmentDesiredSizeBytes, long flushThresholdTimeMillis,
+      int flushAutotuneInitialRows) {
     StreamConfig streamConfig = mock(StreamConfig.class);
     when(streamConfig.getTableNameWithType()).thenReturn(REALTIME_TABLE_NAME);
     when(streamConfig.getFlushThresholdRows()).thenReturn(0);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 9f9a457683..e226a13502 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1455,10 +1455,12 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     }
 
     // Read the max number of rows
-    int segmentMaxRowCount = _streamConfig.getFlushThresholdRows();
-    int flushThresholdSize = 
segmentZKMetadata.getSizeThresholdToFlushSegment();
-    if (flushThresholdSize > 0) {
-      segmentMaxRowCount = flushThresholdSize;
+    int segmentMaxRowCount = 
segmentZKMetadata.getSizeThresholdToFlushSegment();
+    if (segmentMaxRowCount <= 0) {
+      segmentMaxRowCount = _streamConfig.getFlushThresholdRows();
+    }
+    if (segmentMaxRowCount <= 0) {
+      segmentMaxRowCount = StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS;
     }
     _segmentMaxRowCount = segmentMaxRowCount;
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index 11c7ee2010..ce3c6e3e74 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -144,10 +144,9 @@ public class StreamConfigTest {
     assertEquals(streamConfig.getOffsetCriteria(), new 
OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest());
     assertEquals(streamConfig.getConnectionTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
     assertEquals(streamConfig.getFetchTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
-    assertEquals(streamConfig.getFlushThresholdRows(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
     assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
-    assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
-        StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
+    assertEquals(streamConfig.getFlushThresholdRows(), -1);
+    assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
 
     String offsetCriteria = "smallest";
     String decoderProp1Key = "prop1";
@@ -167,8 +166,8 @@ public class StreamConfigTest {
     streamConfigMap.put(
         StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
         fetchTimeout);
-    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
flushThresholdRows);
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, 
flushThresholdTime);
+    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
flushThresholdRows);
     
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE,
 flushSegmentSize);
 
     streamConfig = new StreamConfig(tableName, streamConfigMap);
@@ -181,9 +180,9 @@ public class StreamConfigTest {
     assertTrue(streamConfig.getOffsetCriteria().isSmallest());
     assertEquals(streamConfig.getConnectionTimeoutMillis(), 
Long.parseLong(connectionTimeout));
     assertEquals(streamConfig.getFetchTimeoutMillis(), 
Integer.parseInt(fetchTimeout));
-    assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRows));
     assertEquals(streamConfig.getFlushThresholdTimeMillis(),
         (long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
+    assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRows));
     assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), 
DataSizeUtils.toBytes(flushSegmentSize));
 
     // Backward compatibility check for flushThresholdTime
@@ -191,10 +190,6 @@ public class StreamConfigTest {
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, 
flushThresholdTime);
     streamConfig = new StreamConfig(tableName, streamConfigMap);
     assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
Long.parseLong(flushThresholdTime));
-    flushThresholdTime = "invalid input";
-    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, 
flushThresholdTime);
-    streamConfig = new StreamConfig(tableName, streamConfigMap);
-    assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
 
     // Backward compatibility check for flush threshold rows
     
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
@@ -247,38 +242,61 @@ public class StreamConfigTest {
     streamConfig = new StreamConfig(tableName, streamConfigMap);
     assertEquals(streamConfig.getConnectionTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
 
-    // Invalid flush threshold rows - deprecated property
+    // Invalid flush threshold time
     
streamConfigMap.remove(StreamConfigProperties.constructStreamProperty(streamType,
         StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS));
+    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, 
"time");
+    try {
+      new StreamConfig(tableName, streamConfigMap);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+      
assertTrue(e.getMessage().contains(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME));
+    }
+
+    // Invalid flush threshold rows - deprecated property
+    
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME);
     
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS,
 "rows");
-    streamConfig = new StreamConfig(tableName, streamConfigMap);
-    assertEquals(streamConfig.getFlushThresholdRows(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
+    try {
+      new StreamConfig(tableName, streamConfigMap);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+      
assertTrue(e.getMessage().contains(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS));
+    }
 
     // Invalid flush threshold rows - new property
     
streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS);
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
"rows");
-    streamConfig = new StreamConfig(tableName, streamConfigMap);
-    assertEquals(streamConfig.getFlushThresholdRows(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
-
-    // Invalid flush threshold time
-    
streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_THRESHOLD_ROWS);
-    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, 
"time");
-    streamConfig = new StreamConfig(tableName, streamConfigMap);
-    assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
+    try {
+      new StreamConfig(tableName, streamConfigMap);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+      
assertTrue(e.getMessage().contains(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS));
+    }
 
     // Invalid flush segment size - deprecated property
-    
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME);
+    
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
     
streamConfigMap.put(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE,
 "size");
-    streamConfig = new StreamConfig(tableName, streamConfigMap);
-    assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
-        StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
+    try {
+      new StreamConfig(tableName, streamConfigMap);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+      
assertTrue(e.getMessage().contains(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE));
+    }
 
     // Invalid flush segment size - new property
     
streamConfigMap.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE);
     
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE,
 "size");
-    streamConfig = new StreamConfig(tableName, streamConfigMap);
-    assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
-        StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
+    try {
+      new StreamConfig(tableName, streamConfigMap);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+      
assertTrue(e.getMessage().contains(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE));
+    }
   }
 
   /**
@@ -309,16 +327,18 @@ public class StreamConfigTest {
 
     // Use default values if nothing provided
     streamConfig = new StreamConfig(tableName, streamConfigMap);
-    assertEquals(streamConfig.getFlushThresholdRows(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
     assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
+    assertEquals(streamConfig.getFlushThresholdRows(), -1);
+    assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
 
     // Use regular values if provided
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
flushThresholdRows);
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, 
flushThresholdTime);
     streamConfig = new StreamConfig(tableName, streamConfigMap);
-    assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRows));
     assertEquals(streamConfig.getFlushThresholdTimeMillis(),
         (long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
+    assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRows));
+    assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
 
     // Use regular values if both regular and llc config exists
     streamConfigMap.put(
@@ -327,17 +347,19 @@ public class StreamConfigTest {
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME + 
StreamConfigProperties.LLC_SUFFIX,
         flushThresholdTimeLLC);
     streamConfig = new StreamConfig(tableName, streamConfigMap);
-    assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRows));
     assertEquals(streamConfig.getFlushThresholdTimeMillis(),
         (long) TimeUtils.convertPeriodToMillis(flushThresholdTime));
+    assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRows));
+    assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
 
     // Use llc values if only llc config exists
     
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
     
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME);
     streamConfig = new StreamConfig(tableName, streamConfigMap);
-    assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRowsLLC));
     assertEquals(streamConfig.getFlushThresholdTimeMillis(),
         (long) TimeUtils.convertPeriodToMillis(flushThresholdTimeLLC));
+    assertEquals(streamConfig.getFlushThresholdRows(), 
Integer.parseInt(flushThresholdRowsLLC));
+    assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
   }
 
   @Test
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 6c83ae9cb8..a713c81492 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -155,26 +155,19 @@ public final class TableConfigUtils {
     if (!skipTypes.contains(ValidationType.ALL)) {
       validateTableSchemaConfig(tableConfig);
       validateValidationConfig(tableConfig, schema);
-
-      StreamConfig streamConfig = null;
       validateIngestionConfig(tableConfig, schema, disableGroovy);
+
       // Only allow realtime tables with non-null stream.type and LLC 
consumer.type
       if (tableConfig.getTableType() == TableType.REALTIME) {
         Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        StreamConfig streamConfig;
         try {
           // Validate that StreamConfig can be created
           streamConfig = new StreamConfig(tableConfig.getTableName(), 
streamConfigMap);
         } catch (Exception e) {
           throw new IllegalStateException("Could not create StreamConfig using 
the streamConfig map", e);
         }
-        validateDecoder(streamConfig);
-        // if segmentSizeBytes is specified, rows must be zero.
-        if 
(streamConfigMap.containsKey(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE)
-            || 
streamConfigMap.containsKey(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE))
 {
-          Preconditions.checkState(streamConfig.getFlushThresholdRows() == 0,
-              String.format("Invalid config: %s=%d, it must be set to 0 for 
size based segment threshold to work.",
-                  StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
streamConfig.getFlushThresholdRows()));
-        }
+        validateStreamConfig(streamConfig);
       }
       validateTierConfigList(tableConfig.getTierConfigsList());
       validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
@@ -590,7 +583,17 @@ public final class TableConfigUtils {
   }
 
   @VisibleForTesting
-  static void validateDecoder(StreamConfig streamConfig) {
+  static void validateStreamConfig(StreamConfig streamConfig) {
+    // Validate flush threshold
+    Preconditions.checkState(streamConfig.getFlushThresholdTimeMillis() > 0, 
"Invalid flush threshold time: %s",
+        streamConfig.getFlushThresholdTimeMillis());
+    int flushThresholdRows = streamConfig.getFlushThresholdRows();
+    long flushThresholdSegmentSizeBytes = 
streamConfig.getFlushThresholdSegmentSizeBytes();
+    Preconditions.checkState(!(flushThresholdRows > 0 && 
flushThresholdSegmentSizeBytes > 0),
+        "Flush threshold rows: %s and flush threshold segment size: %s cannot 
be both set", flushThresholdRows,
+        flushThresholdSegmentSizeBytes);
+
+    // Validate decoder
     if 
(streamConfig.getDecoderClass().equals("org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder"))
 {
       String descriptorFilePath = "descriptorFile";
       String protoClassName = "protoClassName";
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index bcf8aa622e..8f695f2b76 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -689,28 +689,31 @@ public class TableConfigUtilsTest {
     // validate the proto decoder
     streamConfigs = getKafkaStreamConfigs();
     //test config should be valid
-    TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs));
+    TableConfigUtils.validateStreamConfig(new StreamConfig("test", 
streamConfigs));
     streamConfigs.remove("stream.kafka.decoder.prop.descriptorFile");
     try {
-      TableConfigUtils.validateDecoder(new StreamConfig("test", 
streamConfigs));
+      TableConfigUtils.validateStreamConfig(new StreamConfig("test", 
streamConfigs));
+      Assert.fail("Should fail without descriptor file");
     } catch (IllegalStateException e) {
       // expected
     }
     streamConfigs = getKafkaStreamConfigs();
     streamConfigs.remove("stream.kafka.decoder.prop.protoClassName");
     try {
-      TableConfigUtils.validateDecoder(new StreamConfig("test", 
streamConfigs));
+      TableConfigUtils.validateStreamConfig(new StreamConfig("test", 
streamConfigs));
+      Assert.fail("Should fail without descriptor proto class name");
     } catch (IllegalStateException e) {
       // expected
     }
     //validate the protobuf pulsar config
     streamConfigs = getPulsarStreamConfigs();
     //test config should be valid
-    TableConfigUtils.validateDecoder(new StreamConfig("test", streamConfigs));
+    TableConfigUtils.validateStreamConfig(new StreamConfig("test", 
streamConfigs));
     //remove the descriptor file, should fail
     streamConfigs.remove("stream.pulsar.decoder.prop.descriptorFile");
     try {
-      TableConfigUtils.validateDecoder(new StreamConfig("test", 
streamConfigs));
+      TableConfigUtils.validateStreamConfig(new StreamConfig("test", 
streamConfigs));
+      Assert.fail("Should fail without descriptor file");
     } catch (IllegalStateException e) {
       // expected
     }
@@ -718,55 +721,31 @@ public class TableConfigUtilsTest {
     //remove the proto class name, should fail
     streamConfigs.remove("stream.pulsar.decoder.prop.protoClassName");
     try {
-      TableConfigUtils.validateDecoder(new StreamConfig("test", 
streamConfigs));
+      TableConfigUtils.validateStreamConfig(new StreamConfig("test", 
streamConfigs));
+      Assert.fail("Should fail without descriptor proto class name");
     } catch (IllegalStateException e) {
       // expected
     }
 
-    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
-        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
-
-    // When size based threshold is specified, default rows does not work, it 
has to be explicitly set to 0.
+    // When size based threshold is specified, default rows should not be set
     streamConfigs = getKafkaStreamConfigs();
     
streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE);
     
streamConfigs.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE);
     
streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE, 
"100m");
     streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
-    ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(List.of(streamConfigs)));
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
-        .setIngestionConfig(ingestionConfig).build();
-
-    try {
-      TableConfigUtils.validate(tableConfig, schema);
-      Assert.fail();
-    } catch (IllegalStateException e) {
-      Assert.assertTrue(e.getMessage().contains("must be set to 0"));
-    }
+    TableConfigUtils.validateStreamConfig(new StreamConfig("test", 
streamConfigs));
 
-    // When size based threshold is specified, rows has to be set to 0.
     streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
"1000");
-    ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(List.of(streamConfigs)));
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
-        .setIngestionConfig(ingestionConfig).build();
-
     try {
-      TableConfigUtils.validate(tableConfig, schema);
-      Assert.fail();
+      TableConfigUtils.validateStreamConfig(new StreamConfig("test", 
streamConfigs));
+      Assert.fail("Should fail when both rows and size based threshold are 
specified");
     } catch (IllegalStateException e) {
-      Assert.assertTrue(e.getMessage().contains("must be set to 0"));
+      // expected
     }
 
-    // When size based threshold is specified, rows has to be set to 0.
+    // Legacy behavior: allow size based threshold to be explicitly set to 0
     streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
"0");
-    ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(List.of(streamConfigs)));
-    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
-        .setIngestionConfig(ingestionConfig).build();
-
-    try {
-      TableConfigUtils.validate(tableConfig, schema);
-    } catch (IllegalStateException e) {
-      Assert.fail(e.getMessage());
-    }
+    TableConfigUtils.validateStreamConfig(new StreamConfig("test", 
streamConfigs));
   }
 
   @Test
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index ea24f5d01b..32bea7aa83 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -222,7 +222,6 @@ public class StreamConfig {
   }
 
   private long extractFlushThresholdSegmentSize(Map<String, String> 
streamConfigMap) {
-    long segmentSizeBytes = -1;
     String key = StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE;
     String flushThresholdSegmentSizeStr = streamConfigMap.get(key);
     if (flushThresholdSegmentSizeStr == null) {
@@ -230,19 +229,14 @@ public class StreamConfig {
       key = StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE;
       flushThresholdSegmentSizeStr = streamConfigMap.get(key);
     }
-
     if (flushThresholdSegmentSizeStr != null) {
       try {
-        segmentSizeBytes = DataSizeUtils.toBytes(flushThresholdSegmentSizeStr);
+        return DataSizeUtils.toBytes(flushThresholdSegmentSizeStr);
       } catch (Exception e) {
-        LOGGER.warn("Invalid config {}: {}, defaulting to: {}", key, 
flushThresholdSegmentSizeStr,
-            
DataSizeUtils.fromBytes(DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES));
+        throw new IllegalArgumentException("Invalid config " + key + ": " + 
flushThresholdSegmentSizeStr);
       }
-    }
-    if (segmentSizeBytes > 0) {
-      return segmentSizeBytes;
     } else {
-      return DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES;
+      return -1;
     }
   }
 
@@ -261,17 +255,12 @@ public class StreamConfig {
     }
     if (flushThresholdRowsStr != null) {
       try {
-        int flushThresholdRows = Integer.parseInt(flushThresholdRowsStr);
-        // Flush threshold rows 0 means using segment size based flush 
threshold
-        Preconditions.checkState(flushThresholdRows >= 0);
-        return flushThresholdRows;
+        return Integer.parseInt(flushThresholdRowsStr);
       } catch (Exception e) {
-        LOGGER.warn("Invalid config {}: {}, defaulting to: {}", key, 
flushThresholdRowsStr,
-            DEFAULT_FLUSH_THRESHOLD_ROWS);
-        return DEFAULT_FLUSH_THRESHOLD_ROWS;
+        throw new IllegalArgumentException("Invalid config " + key + ": " + 
flushThresholdRowsStr);
       }
     } else {
-      return DEFAULT_FLUSH_THRESHOLD_ROWS;
+      return -1;
     }
   }
 
@@ -291,9 +280,7 @@ public class StreamConfig {
           // For backward-compatibility, parse it as milliseconds value
           return Long.parseLong(flushThresholdTimeStr);
         } catch (NumberFormatException nfe) {
-          LOGGER.warn("Invalid config {}: {}, defaulting to: {}", key, 
flushThresholdTimeStr,
-              DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
-          return DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS;
+          throw new IllegalArgumentException("Invalid config " + key + ": " + 
flushThresholdTimeStr);
         }
       }
     } else {
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java 
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
index 1b1d414249..eeee659453 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java
@@ -28,10 +28,10 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 
@@ -68,19 +68,17 @@ public class ConfigUtilsTest {
 
     Map<String, String> streamConfigMap = new HashMap<>();
     streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_TOPIC_NAME),
-            topic);
-    streamConfigMap.put(StreamConfigProperties
-            .constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        consumerFactoryClass);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_DECODER_CLASS),
-            decoderClass);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, 
"aws.accessKey"), "${AWS_ACCESS_KEY}");
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, 
"aws.secretKey"), "${AWS_SECRET_KEY}");
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), 
consumerFactoryClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_DECODER_CLASS),
+        decoderClass);
+    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType, 
"aws.accessKey"),
+        "${AWS_ACCESS_KEY}");
+    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType, 
"aws.secretKey"),
+        "${AWS_SECRET_KEY}");
     indexingConfig.setStreamConfigs(streamConfigMap);
 
     Map<String, String> environment =
@@ -99,24 +97,19 @@ public class ConfigUtilsTest {
 
     // Mandatory values + defaults
     StreamConfig streamConfig = new StreamConfig(tableName, 
indexingConfig.getStreamConfigs());
-    Assert.assertEquals(streamConfig.getType(), streamType);
-    Assert.assertEquals(streamConfig.getTopicName(), topic);
-    Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), 
defaultConsumerFactoryClass);
-    Assert.assertEquals(streamConfig.getDecoderClass(), defaultDecoderClass);
-    
Assert.assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.accessKey"),
-        "default_aws_access_key");
-    
Assert.assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.secretKey"),
-        "default_aws_secret_key");
-    Assert.assertEquals(streamConfig.getDecoderProperties().size(), 0);
-    Assert
-        .assertEquals(streamConfig.getOffsetCriteria(), new 
OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest());
-    Assert
-        .assertEquals(streamConfig.getConnectionTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
-    Assert.assertEquals(streamConfig.getFetchTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
-    Assert.assertEquals(streamConfig.getFlushThresholdRows(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
-    Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
-    Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(),
-        StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES);
+    assertEquals(streamConfig.getType(), streamType);
+    assertEquals(streamConfig.getTopicName(), topic);
+    assertEquals(streamConfig.getConsumerFactoryClassName(), 
defaultConsumerFactoryClass);
+    assertEquals(streamConfig.getDecoderClass(), defaultDecoderClass);
+    
assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.accessKey"),
 "default_aws_access_key");
+    
assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.secretKey"),
 "default_aws_secret_key");
+    assertEquals(streamConfig.getDecoderProperties().size(), 0);
+    assertEquals(streamConfig.getOffsetCriteria(), new 
OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest());
+    assertEquals(streamConfig.getConnectionTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
+    assertEquals(streamConfig.getFetchTimeoutMillis(), 
StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
+    assertEquals(streamConfig.getFlushThresholdTimeMillis(), 
StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS);
+    assertEquals(streamConfig.getFlushThresholdRows(), -1);
+    assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), -1);
   }
 
   @Test
@@ -132,8 +125,8 @@ public class ConfigUtilsTest {
     PinotConfiguration config = new PinotConfiguration(nestedMap);
 
     String configString = config.toString();
-    Assert.assertTrue(configString.contains("credentials"));
-    Assert.assertFalse(configString.contains("verysecret"));
-    Assert.assertFalse(configString.contains("secrettoken"));
+    assertTrue(configString.contains("credentials"));
+    assertFalse(configString.contains("verysecret"));
+    assertFalse(configString.contains("secrettoken"));
   }
 }
diff --git 
a/pinot-tools/src/main/resources/conf/sample_realtime_table_config.json 
b/pinot-tools/src/main/resources/conf/sample_realtime_table_config.json
index 51d1b25eac..518da612ac 100644
--- a/pinot-tools/src/main/resources/conf/sample_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/conf/sample_realtime_table_config.json
@@ -32,7 +32,7 @@
       "stream.kafka.consumer.prop.auto.offset.reset": "largest",
       "stream.kafka.broker.list": "localhost:19092",
       "realtime.segment.flush.threshold.time": "12h",
-      "realtime.segment.flush.threshold.size": "100M"
+      "realtime.segment.flush.threshold.size": "100000"
     }
   },
   "metadata": {
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
index 8cdbcf7bf3..f911ed6558 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
@@ -20,9 +20,7 @@
           "stream.kafka.consumer.prop.auto.offset.reset": "largest",
           "stream.kafka.zk.broker.url": "localhost:2191/kafka",
           "stream.kafka.broker.list": "localhost:19092",
-          "stream.kafka.metadata.populate": "true",
-          "realtime.segment.flush.threshold.time": "12h",
-          "realtime.segment.flush.threshold.size": "10K"
+          "stream.kafka.metadata.populate": "true"
         }
       ]
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to