[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table

2019-05-28 Thread Tomas Bartalos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849454#comment-16849454
 ] 

Tomas Bartalos commented on SPARK-26841:


Hi [~Yohan123],

Those queries are useful when you want to query data directly from Kafka. Let's 
say Kafka have retention of 1 week but you're interested in data for the last 4 
hours to create a live report. The current spark implementation will do 
"full-table" scan of the whole week and filter the last 4 hours in spark, which 
is very inefficient. My proposal is to push the timestamp filter down to Kafka 
during initial query and get back only 4 hours of data, to make queries fast.

> Timestamp pushdown on Kafka table
> -
>
> Key: SPARK-26841
> URL: https://issues.apache.org/jira/browse/SPARK-26841
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Tomas Bartalos
>Priority: Major
>  Labels: Kafka, pushdown, timestamp
>
> As a Spark user I'd like to have fast queries on Kafka table restricted by 
> timestamp.
> I'd like to have quick answers on questions like:
>  * What was inserted to Kafka in past x minutes
>  * What was inserted to Kafka in specified time range
> Example:
> {quote}select * from kafka_table where timestamp > 
> from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")
> select * from kafka_table where timestamp > $from_time and timestamp < 
> $end_time
> {quote}
> Currently timestamp restrictions are not pushdown to KafkaRelation and 
> querying by timestamp on a large Kafka topic takes forever to complete.
> *Technical solution*
> Technically its possible to retrieve Kafka's offsets by provided timestamp 
> with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. 
> Afterwards we can query Kafka topic by retrieved timestamp ranges.
> Querying by timestamp range is already implemented so this change should have 
> minor impact.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table

2019-05-19 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843509#comment-16843509
 ] 

Richard Yu commented on SPARK-26841:


[~Bartalos] Could you expain what use cases there are for supporting these 
queries? It would be helpful if you could provide an example where a "historic 
snapshot" might be necessary.

> Timestamp pushdown on Kafka table
> -
>
> Key: SPARK-26841
> URL: https://issues.apache.org/jira/browse/SPARK-26841
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Tomas Bartalos
>Priority: Major
>  Labels: Kafka, pushdown, timestamp
>
> As a Spark user I'd like to have fast queries on Kafka table restricted by 
> timestamp.
> I'd like to have quick answers on questions like:
>  * What was inserted to Kafka in past x minutes
>  * What was inserted to Kafka in specified time range
> Example:
> {quote}select * from kafka_table where timestamp > 
> from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")
> select * from kafka_table where timestamp > $from_time and timestamp < 
> $end_time
> {quote}
> Currently timestamp restrictions are not pushdown to KafkaRelation and 
> querying by timestamp on a large Kafka topic takes forever to complete.
> *Technical solution*
> Technically its possible to retrieve Kafka's offsets by provided timestamp 
> with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. 
> Afterwards we can query Kafka topic by retrieved timestamp ranges.
> Querying by timestamp range is already implemented so this change should have 
> minor impact.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table

2019-02-08 Thread Tomas Bartalos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763697#comment-16763697
 ] 

Tomas Bartalos commented on SPARK-26841:


Thank you for letting me know, I've filed the PR. I'm not addressing streaming 
query, only the sql query.

It seems we're trying to achieve similar behaviour, the difference is you're 
introducing the timestamp restriction during table creation time, while my 
solution during  table query time.

> Timestamp pushdown on Kafka table
> -
>
> Key: SPARK-26841
> URL: https://issues.apache.org/jira/browse/SPARK-26841
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Tomas Bartalos
>Priority: Major
>  Labels: Kafka, pushdown, timestamp
>
> As a Spark user I'd like to have fast queries on Kafka table restricted by 
> timestamp.
> I'd like to have quick answers on questions like:
>  * What was inserted to Kafka in past x minutes
>  * What was inserted to Kafka in specified time range
> Example:
> {quote}select * from kafka_table where timestamp > 
> from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")
> select * from kafka_table where timestamp > $from_time and timestamp < 
> $end_time
> {quote}
> Currently timestamp restrictions are not pushdown to KafkaRelation and 
> querying by timestamp on a large Kafka topic takes forever to complete.
> *Technical solution*
> Technically its possible to retrieve Kafka's offsets by provided timestamp 
> with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. 
> Afterwards we can query Kafka topic by retrieved timestamp ranges.
> Querying by timestamp range is already implemented so this change should have 
> minor impact.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table

2019-02-08 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763633#comment-16763633
 ] 

Jungtaek Lim commented on SPARK-26841:
--

FYI: I've just filed issue and submitted PR regarding timestamp option.

https://issues.apache.org/jira/browse/SPARK-26848
https://github.com/apache/spark/pull/23747

> Timestamp pushdown on Kafka table
> -
>
> Key: SPARK-26841
> URL: https://issues.apache.org/jira/browse/SPARK-26841
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Tomas Bartalos
>Priority: Major
>  Labels: Kafka, pushdown, timestamp
>
> As a Spark user I'd like to have fast queries on Kafka table restricted by 
> timestamp.
> I'd like to have quick answers on questions like:
>  * What was inserted to Kafka in past x minutes
>  * What was inserted to Kafka in specified time range
> Example:
> {quote}select * from kafka_table where timestamp > 
> from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")
> select * from kafka_table where timestamp > $from_time and timestamp < 
> $end_time
> {quote}
> Currently timestamp restrictions are not pushdown to KafkaRelation and 
> querying by timestamp on a large Kafka topic takes forever to complete.
> *Technical solution*
> Technically its possible to retrieve Kafka's offsets by provided timestamp 
> with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. 
> Afterwards we can query Kafka topic by retrieved timestamp ranges.
> Querying by timestamp range is already implemented so this change should have 
> minor impact.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table

2019-02-08 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763435#comment-16763435
 ] 

Jungtaek Lim commented on SPARK-26841:
--

Honestly I'm also working on the patch to add option on specifying timestamp 
instead of offset on start and end range. I'll also file another JIRA issue and 
push a WIP PR today. We could see whether the patches share some parts or 
orthogonal.

Btw, does your solution also address the case on streaming query?

> Timestamp pushdown on Kafka table
> -
>
> Key: SPARK-26841
> URL: https://issues.apache.org/jira/browse/SPARK-26841
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Tomas Bartalos
>Priority: Major
>  Labels: Kafka, pushdown, timestamp
>
> As a Spark user I'd like to have fast queries on Kafka table restricted by 
> timestamp.
> I'd like to have quick answers on questions like:
>  * What was inserted to Kafka in past x minutes
>  * What was inserted to Kafka in specified time range
> Example:
> {quote}select * from kafka_table where timestamp > 
> from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")
> select * from kafka_table where timestamp > $from_time and timestamp < 
> $end_time
> {quote}
> Currently timestamp restrictions are not pushdown to KafkaRelation and 
> querying by timestamp on a large Kafka topic takes forever to complete.
> *Technical solution*
> Technically its possible to retrieve Kafka's offsets by provided timestamp 
> with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. 
> Afterwards we can query Kafka topic by retrieved timestamp ranges.
> Querying by timestamp range is already implemented so this change should have 
> minor impact.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table

2019-02-08 Thread Tomas Bartalos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763424#comment-16763424
 ] 

Tomas Bartalos commented on SPARK-26841:


Yes, I'm working on the patch, I have a working example, will push a work in 
progress PR today. You can of course use it as an alternative of 
starting/ending offset. Only difference is that you restrict timestamp during 
query, not during table creation. But you can create a view to overcome this. 
Here is what I do to have a live view of Kafka's last 5 minutes in thrift 
server:

create or replace VIEW k_event_5 as select from_avro_by_topic(value, 
'topic_name_in_registry') as event, timestamp from k_event_source where 
timestamp > cast(from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd 
HH:mm:ss") as TIMESTAMP);

from_avro_by_topic is my custom UDF, using schema registry and converting 
binary data from kafka to catalyst structure

> Timestamp pushdown on Kafka table
> -
>
> Key: SPARK-26841
> URL: https://issues.apache.org/jira/browse/SPARK-26841
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Tomas Bartalos
>Priority: Major
>  Labels: Kafka, pushdown, timestamp
>
> As a Spark user I'd like to have fast queries on Kafka table restricted by 
> timestamp.
> I'd like to have quick answers on questions like:
>  * What was inserted to Kafka in past x minutes
>  * What was inserted to Kafka in specified time range
> Example:
> {quote}select * from kafka_table where timestamp > 
> from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")
> select * from kafka_table where timestamp > $from_time and timestamp < 
> $end_time
> {quote}
> Currently timestamp restrictions are not pushdown to KafkaRelation and 
> querying by timestamp on a large Kafka topic takes forever to complete.
> *Technical solution*
> Technically its possible to retrieve Kafka's offsets by provided timestamp 
> with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. 
> Afterwards we can query Kafka topic by retrieved timestamp ranges.
> Querying by timestamp range is already implemented so this change should have 
> minor impact.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26841) Timestamp pushdown on Kafka table

2019-02-07 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763149#comment-16763149
 ] 

Jungtaek Lim commented on SPARK-26841:
--

[~Bartalos]
Are you working on the patch? Because I'm interested on addressing offset by 
timestamp, though my first goal is not a pushdown but alternative of 
startingOffsets/endingOffsets.

> Timestamp pushdown on Kafka table
> -
>
> Key: SPARK-26841
> URL: https://issues.apache.org/jira/browse/SPARK-26841
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.4.0
>Reporter: Tomas Bartalos
>Priority: Major
>  Labels: Kafka, pushdown, timestamp
>
> As a Spark user I'd like to have fast queries on Kafka table restricted by 
> timestamp.
> I'd like to have quick answers on questions like:
>  * What was inserted to Kafka in past x minutes
>  * What was inserted to Kafka in specified time range
> Example:
> {quote}select * from kafka_table where timestamp > 
> from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")
> select * from kafka_table where timestamp > $from_time and timestamp < 
> $end_time
> {quote}
> Currently timestamp restrictions are not pushdown to KafkaRelation and 
> querying by timestamp on a large Kafka topic takes forever to complete.
> *Technical solution*
> Technically its possible to retrieve Kafka's offsets by provided timestamp 
> with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. 
> Afterwards we can query Kafka topic by retrieved timestamp ranges.
> Querying by timestamp range is already implemented so this change should have 
> minor impact.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org