[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table
[ https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849454#comment-16849454 ] Tomas Bartalos commented on SPARK-26841: Hi [~Yohan123], Those queries are useful when you want to query data directly from Kafka. Let's say Kafka have retention of 1 week but you're interested in data for the last 4 hours to create a live report. The current spark implementation will do "full-table" scan of the whole week and filter the last 4 hours in spark, which is very inefficient. My proposal is to push the timestamp filter down to Kafka during initial query and get back only 4 hours of data, to make queries fast. > Timestamp pushdown on Kafka table > - > > Key: SPARK-26841 > URL: https://issues.apache.org/jira/browse/SPARK-26841 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Tomas Bartalos >Priority: Major > Labels: Kafka, pushdown, timestamp > > As a Spark user I'd like to have fast queries on Kafka table restricted by > timestamp. > I'd like to have quick answers on questions like: > * What was inserted to Kafka in past x minutes > * What was inserted to Kafka in specified time range > Example: > {quote}select * from kafka_table where timestamp > > from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss") > select * from kafka_table where timestamp > $from_time and timestamp < > $end_time > {quote} > Currently timestamp restrictions are not pushdown to KafkaRelation and > querying by timestamp on a large Kafka topic takes forever to complete. > *Technical solution* > Technically its possible to retrieve Kafka's offsets by provided timestamp > with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. > Afterwards we can query Kafka topic by retrieved timestamp ranges. > Querying by timestamp range is already implemented so this change should have > minor impact. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table
[ https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843509#comment-16843509 ] Richard Yu commented on SPARK-26841: [~Bartalos] Could you expain what use cases there are for supporting these queries? It would be helpful if you could provide an example where a "historic snapshot" might be necessary. > Timestamp pushdown on Kafka table > - > > Key: SPARK-26841 > URL: https://issues.apache.org/jira/browse/SPARK-26841 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Tomas Bartalos >Priority: Major > Labels: Kafka, pushdown, timestamp > > As a Spark user I'd like to have fast queries on Kafka table restricted by > timestamp. > I'd like to have quick answers on questions like: > * What was inserted to Kafka in past x minutes > * What was inserted to Kafka in specified time range > Example: > {quote}select * from kafka_table where timestamp > > from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss") > select * from kafka_table where timestamp > $from_time and timestamp < > $end_time > {quote} > Currently timestamp restrictions are not pushdown to KafkaRelation and > querying by timestamp on a large Kafka topic takes forever to complete. > *Technical solution* > Technically its possible to retrieve Kafka's offsets by provided timestamp > with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. > Afterwards we can query Kafka topic by retrieved timestamp ranges. > Querying by timestamp range is already implemented so this change should have > minor impact. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table
[ https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763697#comment-16763697 ] Tomas Bartalos commented on SPARK-26841: Thank you for letting me know, I've filed the PR. I'm not addressing streaming query, only the sql query. It seems we're trying to achieve similar behaviour, the difference is you're introducing the timestamp restriction during table creation time, while my solution during table query time. > Timestamp pushdown on Kafka table > - > > Key: SPARK-26841 > URL: https://issues.apache.org/jira/browse/SPARK-26841 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Tomas Bartalos >Priority: Major > Labels: Kafka, pushdown, timestamp > > As a Spark user I'd like to have fast queries on Kafka table restricted by > timestamp. > I'd like to have quick answers on questions like: > * What was inserted to Kafka in past x minutes > * What was inserted to Kafka in specified time range > Example: > {quote}select * from kafka_table where timestamp > > from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss") > select * from kafka_table where timestamp > $from_time and timestamp < > $end_time > {quote} > Currently timestamp restrictions are not pushdown to KafkaRelation and > querying by timestamp on a large Kafka topic takes forever to complete. > *Technical solution* > Technically its possible to retrieve Kafka's offsets by provided timestamp > with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. > Afterwards we can query Kafka topic by retrieved timestamp ranges. > Querying by timestamp range is already implemented so this change should have > minor impact. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table
[ https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763633#comment-16763633 ] Jungtaek Lim commented on SPARK-26841: -- FYI: I've just filed issue and submitted PR regarding timestamp option. https://issues.apache.org/jira/browse/SPARK-26848 https://github.com/apache/spark/pull/23747 > Timestamp pushdown on Kafka table > - > > Key: SPARK-26841 > URL: https://issues.apache.org/jira/browse/SPARK-26841 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Tomas Bartalos >Priority: Major > Labels: Kafka, pushdown, timestamp > > As a Spark user I'd like to have fast queries on Kafka table restricted by > timestamp. > I'd like to have quick answers on questions like: > * What was inserted to Kafka in past x minutes > * What was inserted to Kafka in specified time range > Example: > {quote}select * from kafka_table where timestamp > > from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss") > select * from kafka_table where timestamp > $from_time and timestamp < > $end_time > {quote} > Currently timestamp restrictions are not pushdown to KafkaRelation and > querying by timestamp on a large Kafka topic takes forever to complete. > *Technical solution* > Technically its possible to retrieve Kafka's offsets by provided timestamp > with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. > Afterwards we can query Kafka topic by retrieved timestamp ranges. > Querying by timestamp range is already implemented so this change should have > minor impact. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table
[ https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763435#comment-16763435 ] Jungtaek Lim commented on SPARK-26841: -- Honestly I'm also working on the patch to add option on specifying timestamp instead of offset on start and end range. I'll also file another JIRA issue and push a WIP PR today. We could see whether the patches share some parts or orthogonal. Btw, does your solution also address the case on streaming query? > Timestamp pushdown on Kafka table > - > > Key: SPARK-26841 > URL: https://issues.apache.org/jira/browse/SPARK-26841 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Tomas Bartalos >Priority: Major > Labels: Kafka, pushdown, timestamp > > As a Spark user I'd like to have fast queries on Kafka table restricted by > timestamp. > I'd like to have quick answers on questions like: > * What was inserted to Kafka in past x minutes > * What was inserted to Kafka in specified time range > Example: > {quote}select * from kafka_table where timestamp > > from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss") > select * from kafka_table where timestamp > $from_time and timestamp < > $end_time > {quote} > Currently timestamp restrictions are not pushdown to KafkaRelation and > querying by timestamp on a large Kafka topic takes forever to complete. > *Technical solution* > Technically its possible to retrieve Kafka's offsets by provided timestamp > with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. > Afterwards we can query Kafka topic by retrieved timestamp ranges. > Querying by timestamp range is already implemented so this change should have > minor impact. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table
[ https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763424#comment-16763424 ] Tomas Bartalos commented on SPARK-26841: Yes, I'm working on the patch, I have a working example, will push a work in progress PR today. You can of course use it as an alternative of starting/ending offset. Only difference is that you restrict timestamp during query, not during table creation. But you can create a view to overcome this. Here is what I do to have a live view of Kafka's last 5 minutes in thrift server: create or replace VIEW k_event_5 as select from_avro_by_topic(value, 'topic_name_in_registry') as event, timestamp from k_event_source where timestamp > cast(from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss") as TIMESTAMP); from_avro_by_topic is my custom UDF, using schema registry and converting binary data from kafka to catalyst structure > Timestamp pushdown on Kafka table > - > > Key: SPARK-26841 > URL: https://issues.apache.org/jira/browse/SPARK-26841 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Tomas Bartalos >Priority: Major > Labels: Kafka, pushdown, timestamp > > As a Spark user I'd like to have fast queries on Kafka table restricted by > timestamp. > I'd like to have quick answers on questions like: > * What was inserted to Kafka in past x minutes > * What was inserted to Kafka in specified time range > Example: > {quote}select * from kafka_table where timestamp > > from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss") > select * from kafka_table where timestamp > $from_time and timestamp < > $end_time > {quote} > Currently timestamp restrictions are not pushdown to KafkaRelation and > querying by timestamp on a large Kafka topic takes forever to complete. > *Technical solution* > Technically its possible to retrieve Kafka's offsets by provided timestamp > with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. > Afterwards we can query Kafka topic by retrieved timestamp ranges. > Querying by timestamp range is already implemented so this change should have > minor impact. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table
[ https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763149#comment-16763149 ] Jungtaek Lim commented on SPARK-26841: -- [~Bartalos] Are you working on the patch? Because I'm interested on addressing offset by timestamp, though my first goal is not a pushdown but alternative of startingOffsets/endingOffsets. > Timestamp pushdown on Kafka table > - > > Key: SPARK-26841 > URL: https://issues.apache.org/jira/browse/SPARK-26841 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 2.4.0 >Reporter: Tomas Bartalos >Priority: Major > Labels: Kafka, pushdown, timestamp > > As a Spark user I'd like to have fast queries on Kafka table restricted by > timestamp. > I'd like to have quick answers on questions like: > * What was inserted to Kafka in past x minutes > * What was inserted to Kafka in specified time range > Example: > {quote}select * from kafka_table where timestamp > > from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss") > select * from kafka_table where timestamp > $from_time and timestamp < > $end_time > {quote} > Currently timestamp restrictions are not pushdown to KafkaRelation and > querying by timestamp on a large Kafka topic takes forever to complete. > *Technical solution* > Technically its possible to retrieve Kafka's offsets by provided timestamp > with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. > Afterwards we can query Kafka topic by retrieved timestamp ranges. > Querying by timestamp range is already implemented so this change should have > minor impact. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org