This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bc96b13  Support NOT in TimeSegmentPruner (#8381)
bc96b13 is described below

commit bc96b13333a5d7da65b3f6b9f05d355a537e9647
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Mar 22 12:04:37 2022 -0700

    Support NOT in TimeSegmentPruner (#8381)
---
 .../routing/segmentpruner/TimeSegmentPruner.java   | 31 +++++++++++
 .../routing/segmentpruner/SegmentPrunerTest.java   | 60 +++++++++++++++++-----
 2 files changed, 77 insertions(+), 14 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
index e84dabc..0b1984e 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
@@ -200,6 +200,10 @@ public class TimeSegmentPruner implements SegmentPruner {
    *         < 50 OR firstName = Jason')
    *         Empty list if time condition is specified but invalid (e.g. 
'SELECT * from myTable where time < 50 AND
    *         time > 100')
+   *         Sorted time intervals without overlapping if time condition is 
valid
+   *
+   * TODO: 1. Merge adjacent intervals
+   *       2. Set interval boundary using time granularity instead of millis
    */
   @Nullable
   private List<Interval> getFilterTimeIntervals(Expression filterExpression) {
@@ -233,6 +237,14 @@ public class TimeSegmentPruner implements SegmentPruner {
           }
         }
         return getUnionSortedIntervals(orIntervals);
+      case NOT:
+        assert operands.size() == 1;
+        List<Interval> childIntervals = 
getFilterTimeIntervals(operands.get(0));
+        if (childIntervals == null) {
+          return null;
+        } else {
+          return getComplementSortedIntervals(childIntervals);
+        }
       case EQUALS: {
         Identifier identifier = operands.get(0).getIdentifier();
         if (identifier != null && identifier.getName().equals(_timeColumn)) {
@@ -479,6 +491,25 @@ public class TimeSegmentPruner implements SegmentPruner {
   }
 
   /**
+   * Returns the complement (non-overlapping sorted intervals) of the given 
non-overlapping sorted intervals.
+   */
+  private List<Interval> getComplementSortedIntervals(List<Interval> 
intervals) {
+    List<Interval> res = new ArrayList<>();
+    long startTime = MIN_START_TIME;
+    for (Interval interval : intervals) {
+      if (interval._min > startTime) {
+        res.add(new Interval(startTime, interval._min - 1));
+      }
+      if (interval._max == MAX_END_TIME) {
+        return res;
+      }
+      startTime = interval._max + 1;
+    }
+    res.add(new Interval(startTime, MAX_END_TIME));
+    return res;
+  }
+
+  /**
    * Parse interval to millisecond as [min, max] with both sides included.
    * E.g. '(* 16311]' is parsed as [0, 16311], '(1455 16311)' is parsed as 
[1456, 16310]
    */
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index 9f330e0..ad0f995 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -80,13 +80,13 @@ public class SegmentPrunerTest extends ControllerTest {
   private static final String QUERY_2 = "SELECT * FROM testTable where 
memberId = 0";
   private static final String QUERY_3 = "SELECT * FROM testTable where 
memberId IN (1, 2)";
 
-  private static final String QUERY_5 = "SELECT * FROM testTable where 
timeColumn = 40";
-  private static final String QUERY_6 = "SELECT * FROM testTable where 
timeColumn BETWEEN 20 AND 30";
-  private static final String QUERY_7 = "SELECT * FROM testTable where 30 < 
timeColumn AND timeColumn <= 50";
-  private static final String QUERY_8 = "SELECT * FROM testTable where 
timeColumn < 15 OR timeColumn > 45";
-  private static final String QUERY_9 =
+  private static final String TIME_QUERY_1 = "SELECT * FROM testTable where 
timeColumn = 40";
+  private static final String TIME_QUERY_2 = "SELECT * FROM testTable where 
timeColumn BETWEEN 20 AND 30";
+  private static final String TIME_QUERY_3 = "SELECT * FROM testTable where 30 
< timeColumn AND timeColumn <= 50";
+  private static final String TIME_QUERY_4 = "SELECT * FROM testTable where 
timeColumn < 15 OR timeColumn > 45";
+  private static final String TIME_QUERY_5 =
       "SELECT * FROM testTable where timeColumn < 15 OR (60 < timeColumn AND 
timeColumn < 70)";
-  private static final String QUERY_10 = "SELECT * FROM testTable where 
timeColumn < 0 AND timeColumn > 0";
+  private static final String TIME_QUERY_6 = "SELECT * FROM testTable where 
timeColumn < 0 AND timeColumn > 0";
 
   private static final String SDF_QUERY_1 = "SELECT * FROM testTable where 
timeColumn = 20200131";
   private static final String SDF_QUERY_2 = "SELECT * FROM testTable where 
timeColumn BETWEEN 20200101 AND 20200331";
@@ -97,6 +97,9 @@ public class SegmentPrunerTest extends ControllerTest {
   private static final String SDF_QUERY_5 =
       "SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND 
timeColumn >= 20200530";
 
+  private static final String SQL_TIME_QUERY_1 = "SELECT * FROM testTable 
WHERE timeColumn NOT BETWEEN 20 AND 30";
+  private static final String SQL_TIME_QUERY_2 = "SELECT * FROM testTable 
WHERE NOT timeColumn > 30";
+
   // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use 
KinesisConfig.STREAM_TYPE directly, we
   // hardcode the value here to avoid pulling the entire pinot-kinesis module 
as dependency.
   private static final String KINESIS_STREAM_TYPE = "kinesis";
@@ -234,8 +237,8 @@ public class SegmentPrunerTest extends ControllerTest {
     assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // When streamIngestionConfig is configured with Kinesis streaming, 
EmptySegmentPruner should be returned.
-    
when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(Collections.singletonList(
-        Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, 
KINESIS_STREAM_TYPE)));
+    when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(
+        
Collections.singletonList(Collections.singletonMap(StreamConfigProperties.STREAM_TYPE,
 KINESIS_STREAM_TYPE)));
     when(indexingConfig.getStreamConfigs()).thenReturn(
         Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, 
KINESIS_STREAM_TYPE));
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
@@ -332,12 +335,12 @@ public class SegmentPrunerTest extends ControllerTest {
   @Test(dataProvider = "compilerProvider")
   public void testTimeSegmentPruner(QueryCompiler compiler) {
     BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);
-    BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(QUERY_5);
-    BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(QUERY_6);
-    BrokerRequest brokerRequest4 = compiler.compileToBrokerRequest(QUERY_7);
-    BrokerRequest brokerRequest5 = compiler.compileToBrokerRequest(QUERY_8);
-    BrokerRequest brokerRequest6 = compiler.compileToBrokerRequest(QUERY_9);
-    BrokerRequest brokerRequest7 = compiler.compileToBrokerRequest(QUERY_10);
+    BrokerRequest brokerRequest2 = 
compiler.compileToBrokerRequest(TIME_QUERY_1);
+    BrokerRequest brokerRequest3 = 
compiler.compileToBrokerRequest(TIME_QUERY_2);
+    BrokerRequest brokerRequest4 = 
compiler.compileToBrokerRequest(TIME_QUERY_3);
+    BrokerRequest brokerRequest5 = 
compiler.compileToBrokerRequest(TIME_QUERY_4);
+    BrokerRequest brokerRequest6 = 
compiler.compileToBrokerRequest(TIME_QUERY_5);
+    BrokerRequest brokerRequest7 = 
compiler.compileToBrokerRequest(TIME_QUERY_6);
     // NOTE: Ideal state and external view are not used in the current 
implementation
     IdealState idealState = Mockito.mock(IdealState.class);
     ExternalView externalView = Mockito.mock(ExternalView.class);
@@ -516,6 +519,35 @@ public class SegmentPrunerTest extends ControllerTest {
     assertEquals(segmentPruner.prune(brokerRequest5, onlineSegments), 
Collections.emptySet());
   }
 
+  @Test
+  public void testTimeSegmentPrunerSql() {
+    CalciteSqlCompiler compiler = new CalciteSqlCompiler();
+    BrokerRequest brokerRequest1 = 
compiler.compileToBrokerRequest(SQL_TIME_QUERY_1);
+    BrokerRequest brokerRequest2 = 
compiler.compileToBrokerRequest(SQL_TIME_QUERY_2);
+    // NOTE: Ideal state and external view are not used in the current 
implementation
+    IdealState idealState = Mockito.mock(IdealState.class);
+    ExternalView externalView = Mockito.mock(ExternalView.class);
+
+    TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, 
TableType.REALTIME);
+    setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
+
+    TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, 
_propertyStore);
+    Set<String> onlineSegments = new HashSet<>();
+    String segment0 = "segment0";
+    onlineSegments.add(segment0);
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60, 
TimeUnit.DAYS);
+    String segment1 = "segment1";
+    onlineSegments.add(segment1);
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30, 
TimeUnit.DAYS);
+    String segment2 = "segment2";
+    onlineSegments.add(segment2);
+    setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, 
TimeUnit.DAYS);
+    segmentPruner.init(idealState, externalView, onlineSegments);
+
+    assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new 
HashSet<>(Arrays.asList(segment0, segment2)));
+    assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new 
HashSet<>(Arrays.asList(segment0, segment1)));
+  }
+
   @Test(dataProvider = "compilerProvider")
   public void testEmptySegmentPruner(QueryCompiler compiler) {
     BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to