[ https://issues.apache.org/jira/browse/SPARK-26841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16763424#comment-16763424 ]
Tomas Bartalos commented on SPARK-26841: ---------------------------------------- Yes, I'm working on the patch, I have a working example, will push a work in progress PR today. You can of course use it as an alternative of starting/ending offset. Only difference is that you restrict timestamp during query, not during table creation. But you can create a view to overcome this. Here is what I do to have a live view of Kafka's last 5 minutes in thrift server: create or replace VIEW k_event_5 as select from_avro_by_topic(value, 'topic_name_in_registry') as event, timestamp from k_event_source where timestamp > cast(from_unixtime(unix_timestamp() - 5 * 60, "YYYY-MM-dd HH:mm:ss") as TIMESTAMP); from_avro_by_topic is my custom UDF, using schema registry and converting binary data from kafka to catalyst structure > Timestamp pushdown on Kafka table > --------------------------------- > > Key: SPARK-26841 > URL: https://issues.apache.org/jira/browse/SPARK-26841 > Project: Spark > Issue Type: Improvement > Components: Input/Output > Affects Versions: 2.4.0 > Reporter: Tomas Bartalos > Priority: Major > Labels: Kafka, pushdown, timestamp > > As a Spark user I'd like to have fast queries on Kafka table restricted by > timestamp. > I'd like to have quick answers on questions like: > * What was inserted to Kafka in past x minutes > * What was inserted to Kafka in specified time range > Example: > {quote}select * from kafka_table where timestamp > > from_unixtime(unix_timestamp() - 5 * 60, "YYYY-MM-dd HH:mm:ss") > select * from kafka_table where timestamp > $from_time and timestamp < > $end_time > {quote} > Currently timestamp restrictions are not pushdown to KafkaRelation and > querying by timestamp on a large Kafka topic takes forever to complete. > *Technical solution* > Technically its possible to retrieve Kafka's offsets by provided timestamp > with org.apache.kafka.clients.consumer.Consumer#offsetsForTimes(..) method. > Afterwards we can query Kafka topic by retrieved timestamp ranges. > Querying by timestamp range is already implemented so this change should have > minor impact. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org