[ 
https://issues.apache.org/jira/browse/SPARK-32046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177562#comment-17177562
 ] 

Takeshi Yamamuro edited comment on SPARK-32046 at 8/14/20, 7:28 AM:
--------------------------------------------------------------------

>> The question when does a query evaluation start and stop? Do mutual 
>> exclusive dataframes being processed consist of the same query evaluation? 
>> If yes, then current timestamp's behavior in spark shell is correct; 
>> however, as user, that would be extremely undesirable behavior. I would 
>> rather cache the current timestamp and call it again for a new time.

The evaluation of current_timestamp happens per dataframe just before invoking 
Spark jobs (more specifically, its done at the optimization stage in a driver 
side).

>> Now if a query evaluation stops once it is executed and starts anew when 
>> another dataframe or action is called, then the behavior in shell and 
>> notebooks are incorrect. The notebooks are only correct for a few runs and 
>> then default to not changing.

In normal cases, I think the behaviour of spark-shell is correct. But, I'm not 
sure what's going on ZP/Jupyter. If you want to make it robust, I think its 
better to use checkpoint instead of cache though.


was (Author: maropu):
>> The question when does a query evaluation start and stop? Do mutual 
>> exclusive dataframes being processed consist of the same query evaluation? 
>> If yes, then current timestamp's behavior in spark shell is correct; 
>> however, as user, that would be extremely undesirable behavior. I would 
>> rather cache the current timestamp and call it again for a new time.

The evaluation of current_timestamp happens per dataframe just before invoking 
Spark jobs (more specifically, its done at the optimization stage in a driver 
side).

>> Now if a query evaluation stops once it is executed and starts anew when 
>> another dataframe or action is called, then the behavior in shell and 
>> notebooks are incorrect. The notebooks are only correct for a few runs and 
>> then default to not changing.

In normal cases, I think the behaviour of spark-shell is correct. But, I'm not 
sure what's going on ZP/Jupyter. If you want to make it robust, I think its 
better to use checkpoint instead of cache though.

> current_timestamp called in a cache dataframe freezes the time for all future 
> calls
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-32046
>                 URL: https://issues.apache.org/jira/browse/SPARK-32046
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0, 2.4.4, 3.0.0
>            Reporter: Dustin Smith
>            Priority: Minor
>              Labels: caching, sql, time
>
> If I call current_timestamp 3 times while caching the dataframe variable in 
> order to freeze that dataframe's time, the 3rd dataframe time and beyond 
> (4th, 5th, ...) will be frozen to the 2nd dataframe's time. The 1st dataframe 
> and the 2nd will differ in time but will become static on the 3rd usage and 
> beyond (when running on Zeppelin or Jupyter).
> Additionally, caching only caused 2 dataframes to cache skipping the 3rd. 
> However,
> {code:java}
> val df = Seq(java.time.LocalDateTime.now.toString).toDF("datetime").cache
> df.count
> // this can be run 3 times no issue.
> // then later cast to TimestampType{code}
> doesn't have this problem and all 3 dataframes cache with correct times 
> displaying.
> Running the code in shell and Jupyter or Zeppelin (ZP) also produces 
> different results. In the shell, you only get 1 unique time no matter how 
> many times you run it, current_timestamp. However, in ZP or Jupyter I have 
> always received 2 unique times before it froze.
>  
> {code:java}
> val df1 = spark.range(1).select(current_timestamp as "datetime").cache
> df1.count
> df1.show(false)
> Thread.sleep(9500)
> val df2 = spark.range(1).select(current_timestamp as "datetime").cache
> df2.count 
> df2.show(false)
> Thread.sleep(9500)
> val df3 = spark.range(1).select(current_timestamp as "datetime").cache 
> df3.count 
> df3.show(false){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to