[ 
https://issues.apache.org/jira/browse/DRILL-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16487671#comment-16487671
 ] 

ASF GitHub Bot commented on DRILL-5977:
---------------------------------------

akumarb2010 commented on issue #1272: DRILL-5977: Filter Pushdown in 
Drill-Kafka plugin
URL: https://github.com/apache/drill/pull/1272#issuecomment-391427723
 
 
   @aravi5  Thanks for providing all the details. Please find my comments as 
below.
   
   >> If the predicates are only on kafkaMsgOffset (for example SELECT * FROM 
kafka.LogEventStream WHERE kafkaMsgOffset >= 1000 AND kafkaMsgOffset < 2000), 
this will apply the pushdown to ALL partitions within a topic. If there is a 
partition where such offsets do not exist (either the offsets have expired or 
messages for those offsets are yet to be produced), then such partition will 
not be scanned.
   
   In Kafka, `offset` scope itself is per partition. I am unable to find any 
use case, we can take the range of offsets and apply on all partitions. In most 
of the scenario's they may not be valid offsets.
   
   IMHO, we should only apply predicate pushdown where have exact scan specs. 
   
   For example, in case below cases we can apply the predicates without any 
issues.
   
   ```
   SELECT * FROM kafka.LogEventStream WHERE (kafkaPartitionId = 1 AND 
kafkaMsgOffset > 1000 AND kafkaMsgOffset < 2000) OR (kafkaPartitionId = 2 AND 
kafkaMsgOffset > 4000)
   
   ```
   
   And this way, we can use this predicates feature for external checkpointing 
mechanism.
   
   And coming to time stamps, my point in case of invalid partitionId, query 
might block indefinitely with this feature. Where as in other case, we will 
return empty results.
   
   It will be great if you can add few test cases with invalid partitions like 
below (Assuming partition 100 doesn't exist)
   
   ```
   SELECT * FROM kafka.LogEventStream WHERE (kafkaPartitionId = 100 AND 
kafkaMsgTimestamp > 1527092007199)
   
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> predicate pushdown support kafkaMsgOffset
> -----------------------------------------
>
>                 Key: DRILL-5977
>                 URL: https://issues.apache.org/jira/browse/DRILL-5977
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: B Anil Kumar
>            Assignee: Abhishek Ravi
>            Priority: Major
>             Fix For: 1.14.0
>
>
> As part of Kafka storage plugin review, below is the suggestion from Paul.
> {noformat}
> Does it make sense to provide a way to select a range of messages: a starting 
> point or a count? Perhaps I want to run my query every five minutes, scanning 
> only those messages since the previous scan. Or, I want to limit my take to, 
> say, the next 1000 messages. Could we use a pseudo-column such as 
> "kafkaMsgOffset" for that purpose? Maybe
> SELECT * FROM <some topic> WHERE kafkaMsgOffset > 12345
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to