[jira] [Comment Edited] (SPARK-32046) current_timestamp called in a cache dataframe freezes the time for all future calls
[ https://issues.apache.org/jira/browse/SPARK-32046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17203040#comment-17203040 ] Pablo Langa Blanco edited comment on SPARK-32046 at 9/28/20, 4:20 PM: -- I have been looking at the problem and I think I have understood the problem. What I don't understand is why Jupyter and ZP behave differently. When a dataframe is cached, the key to the map that stores the cached objects is a plan. (df1.queryExecution.analyzed.canonicalized). Then, in the second execution, when you go to check if the dataframe is cached you do the following check. {code:java} df1.queryExecution.analyzed.canonicalized == df2.queryExecution.analyzed.canonicalized{code} In this case, both execution plans are the same so it considers that it has the Dataframe cached and uses it It seems a rather strange case in real life to have 2 identical Dataframes, one cached and one not, have a timestamp and do not want to reuse the cached Dataframe was (Author: planga82): I have been looking at the problem and I think I have understood the problem. What I don't understand is why Jupyter and ZP behave differently. When a dataframe is cached, the key to the map that stores the cached objects is a plan. (df1.queryExecution.analyzed.canonicalized). Then, in the second execution, when you go to check if the dataframe is cached you do the following check. df1.queryExecution.analyzed.canonicalized == df2.queryExecution.analyzed.canonicalized In this case, both execution plans are the same so it considers that it has the Dataframe cached and uses it It seems a rather strange case in real life to have 2 identical Dataframes, one cached and one not, have a timestamp and do not want to reuse the cached Dataframe > current_timestamp called in a cache dataframe freezes the time for all future > calls > --- > > Key: SPARK-32046 > URL: https://issues.apache.org/jira/browse/SPARK-32046 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.4, 3.0.0 >Reporter: Dustin Smith >Priority: Minor > Labels: caching, sql, time > > If I call current_timestamp 3 times while caching the dataframe variable in > order to freeze that dataframe's time, the 3rd dataframe time and beyond > (4th, 5th, ...) will be frozen to the 2nd dataframe's time. The 1st dataframe > and the 2nd will differ in time but will become static on the 3rd usage and > beyond (when running on Zeppelin or Jupyter). > Additionally, caching only caused 2 dataframes to cache skipping the 3rd. > However, > {code:java} > val df = Seq(java.time.LocalDateTime.now.toString).toDF("datetime").cache > df.count > // this can be run 3 times no issue. > // then later cast to TimestampType{code} > doesn't have this problem and all 3 dataframes cache with correct times > displaying. > Running the code in shell and Jupyter or Zeppelin (ZP) also produces > different results. In the shell, you only get 1 unique time no matter how > many times you run it, current_timestamp. However, in ZP or Jupyter I have > always received 2 unique times before it froze. > > {code:java} > val df1 = spark.range(1).select(current_timestamp as "datetime").cache > df1.count > df1.show(false) > Thread.sleep(9500) > val df2 = spark.range(1).select(current_timestamp as "datetime").cache > df2.count > df2.show(false) > Thread.sleep(9500) > val df3 = spark.range(1).select(current_timestamp as "datetime").cache > df3.count > df3.show(false){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-32046) current_timestamp called in a cache dataframe freezes the time for all future calls
[ https://issues.apache.org/jira/browse/SPARK-32046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17203043#comment-17203043 ] Dustin Smith edited comment on SPARK-32046 at 9/28/20, 7:12 AM: [~planga82] the use case is related to a working environment. We need to do some different processing that can finish at different times. However, we need to record the times and merge all the dataframes. Caching the dataframes to freeze the time so next time the data is called the time wouldn't change. In my toy problem, I just created df 1-3. In this example, I would want all to have the time when they were cached so 3 separate times. I do have a work around using the java implementation but this spark implementation just seems odd to me. was (Author: dustin.smith.tdg): [~planga82] the use case is related to a working environment. We need to do some different processing that can finish at different times. However, we need to record the times and merge all the dataframes. Caching the dataframes to freeze the time so next time the data is called the time wouldn't change. > current_timestamp called in a cache dataframe freezes the time for all future > calls > --- > > Key: SPARK-32046 > URL: https://issues.apache.org/jira/browse/SPARK-32046 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.4, 3.0.0 >Reporter: Dustin Smith >Priority: Minor > Labels: caching, sql, time > > If I call current_timestamp 3 times while caching the dataframe variable in > order to freeze that dataframe's time, the 3rd dataframe time and beyond > (4th, 5th, ...) will be frozen to the 2nd dataframe's time. The 1st dataframe > and the 2nd will differ in time but will become static on the 3rd usage and > beyond (when running on Zeppelin or Jupyter). > Additionally, caching only caused 2 dataframes to cache skipping the 3rd. > However, > {code:java} > val df = Seq(java.time.LocalDateTime.now.toString).toDF("datetime").cache > df.count > // this can be run 3 times no issue. > // then later cast to TimestampType{code} > doesn't have this problem and all 3 dataframes cache with correct times > displaying. > Running the code in shell and Jupyter or Zeppelin (ZP) also produces > different results. In the shell, you only get 1 unique time no matter how > many times you run it, current_timestamp. However, in ZP or Jupyter I have > always received 2 unique times before it froze. > > {code:java} > val df1 = spark.range(1).select(current_timestamp as "datetime").cache > df1.count > df1.show(false) > Thread.sleep(9500) > val df2 = spark.range(1).select(current_timestamp as "datetime").cache > df2.count > df2.show(false) > Thread.sleep(9500) > val df3 = spark.range(1).select(current_timestamp as "datetime").cache > df3.count > df3.show(false){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-32046) current_timestamp called in a cache dataframe freezes the time for all future calls
[ https://issues.apache.org/jira/browse/SPARK-32046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177398#comment-17177398 ] Takeshi Yamamuro edited comment on SPARK-32046 at 8/14/20, 7:28 AM: This issue depends on ZP or Jupyter? If so, I think it is hard to reproduce this issue... The -Non-deterministic- current_timestamp expr can change output if cache broken in the case. So, I think it is not a good idea for applications to depend on those kinds of values, either way. was (Author: maropu): This issue depends on ZP or Jupyter? If so, I think it is hard to reproduce this issue... The -Non-deterministic- exprs can change output if cache broken in the case. So, I think it is not a good idea for applications to depend on those kinds of values, either way. > current_timestamp called in a cache dataframe freezes the time for all future > calls > --- > > Key: SPARK-32046 > URL: https://issues.apache.org/jira/browse/SPARK-32046 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.4, 3.0.0 >Reporter: Dustin Smith >Priority: Minor > Labels: caching, sql, time > > If I call current_timestamp 3 times while caching the dataframe variable in > order to freeze that dataframe's time, the 3rd dataframe time and beyond > (4th, 5th, ...) will be frozen to the 2nd dataframe's time. The 1st dataframe > and the 2nd will differ in time but will become static on the 3rd usage and > beyond (when running on Zeppelin or Jupyter). > Additionally, caching only caused 2 dataframes to cache skipping the 3rd. > However, > {code:java} > val df = Seq(java.time.LocalDateTime.now.toString).toDF("datetime").cache > df.count > // this can be run 3 times no issue. > // then later cast to TimestampType{code} > doesn't have this problem and all 3 dataframes cache with correct times > displaying. > Running the code in shell and Jupyter or Zeppelin (ZP) also produces > different results. In the shell, you only get 1 unique time no matter how > many times you run it, current_timestamp. However, in ZP or Jupyter I have > always received 2 unique times before it froze. > > {code:java} > val df1 = spark.range(1).select(current_timestamp as "datetime").cache > df1.count > df1.show(false) > Thread.sleep(9500) > val df2 = spark.range(1).select(current_timestamp as "datetime").cache > df2.count > df2.show(false) > Thread.sleep(9500) > val df3 = spark.range(1).select(current_timestamp as "datetime").cache > df3.count > df3.show(false){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-32046) current_timestamp called in a cache dataframe freezes the time for all future calls
[ https://issues.apache.org/jira/browse/SPARK-32046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177562#comment-17177562 ] Takeshi Yamamuro edited comment on SPARK-32046 at 8/14/20, 7:28 AM: >> The question when does a query evaluation start and stop? Do mutual >> exclusive dataframes being processed consist of the same query evaluation? >> If yes, then current timestamp's behavior in spark shell is correct; >> however, as user, that would be extremely undesirable behavior. I would >> rather cache the current timestamp and call it again for a new time. The evaluation of current_timestamp happens per dataframe just before invoking Spark jobs (more specifically, its done at the optimization stage in a driver side). >> Now if a query evaluation stops once it is executed and starts anew when >> another dataframe or action is called, then the behavior in shell and >> notebooks are incorrect. The notebooks are only correct for a few runs and >> then default to not changing. In normal cases, I think the behaviour of spark-shell is correct. But, I'm not sure what's going on ZP/Jupyter. If you want to make it robust, I think its better to use checkpoint instead of cache though. was (Author: maropu): >> The question when does a query evaluation start and stop? Do mutual >> exclusive dataframes being processed consist of the same query evaluation? >> If yes, then current timestamp's behavior in spark shell is correct; >> however, as user, that would be extremely undesirable behavior. I would >> rather cache the current timestamp and call it again for a new time. The evaluation of current_timestamp happens per dataframe just before invoking Spark jobs (more specifically, its done at the optimization stage in a driver side). >> Now if a query evaluation stops once it is executed and starts anew when >> another dataframe or action is called, then the behavior in shell and >> notebooks are incorrect. The notebooks are only correct for a few runs and >> then default to not changing. In normal cases, I think the behaviour of spark-shell is correct. But, I'm not sure what's going on ZP/Jupyter. If you want to make it robust, I think its better to use checkpoint instead of cache though. > current_timestamp called in a cache dataframe freezes the time for all future > calls > --- > > Key: SPARK-32046 > URL: https://issues.apache.org/jira/browse/SPARK-32046 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.4, 3.0.0 >Reporter: Dustin Smith >Priority: Minor > Labels: caching, sql, time > > If I call current_timestamp 3 times while caching the dataframe variable in > order to freeze that dataframe's time, the 3rd dataframe time and beyond > (4th, 5th, ...) will be frozen to the 2nd dataframe's time. The 1st dataframe > and the 2nd will differ in time but will become static on the 3rd usage and > beyond (when running on Zeppelin or Jupyter). > Additionally, caching only caused 2 dataframes to cache skipping the 3rd. > However, > {code:java} > val df = Seq(java.time.LocalDateTime.now.toString).toDF("datetime").cache > df.count > // this can be run 3 times no issue. > // then later cast to TimestampType{code} > doesn't have this problem and all 3 dataframes cache with correct times > displaying. > Running the code in shell and Jupyter or Zeppelin (ZP) also produces > different results. In the shell, you only get 1 unique time no matter how > many times you run it, current_timestamp. However, in ZP or Jupyter I have > always received 2 unique times before it froze. > > {code:java} > val df1 = spark.range(1).select(current_timestamp as "datetime").cache > df1.count > df1.show(false) > Thread.sleep(9500) > val df2 = spark.range(1).select(current_timestamp as "datetime").cache > df2.count > df2.show(false) > Thread.sleep(9500) > val df3 = spark.range(1).select(current_timestamp as "datetime").cache > df3.count > df3.show(false){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-32046) current_timestamp called in a cache dataframe freezes the time for all future calls
[ https://issues.apache.org/jira/browse/SPARK-32046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177398#comment-17177398 ] Takeshi Yamamuro edited comment on SPARK-32046 at 8/14/20, 7:25 AM: This issue depends on ZP or Jupyter? If so, I think it is hard to reproduce this issue... The -Non-deterministic- exprs can change output if cache broken in the case. So, I think it is not a good idea for applications to depend on those kinds of values, either way. was (Author: maropu): This issue depends on ZP or Jupyter? If so, I think it is hard to reproduce this issue... Non-deterministic exprs can change output if cache broken in the case. So, I think it is not a good idea for applications to depend on those kinds of values, either way. > current_timestamp called in a cache dataframe freezes the time for all future > calls > --- > > Key: SPARK-32046 > URL: https://issues.apache.org/jira/browse/SPARK-32046 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.4, 3.0.0 >Reporter: Dustin Smith >Priority: Minor > Labels: caching, sql, time > > If I call current_timestamp 3 times while caching the dataframe variable in > order to freeze that dataframe's time, the 3rd dataframe time and beyond > (4th, 5th, ...) will be frozen to the 2nd dataframe's time. The 1st dataframe > and the 2nd will differ in time but will become static on the 3rd usage and > beyond (when running on Zeppelin or Jupyter). > Additionally, caching only caused 2 dataframes to cache skipping the 3rd. > However, > {code:java} > val df = Seq(java.time.LocalDateTime.now.toString).toDF("datetime").cache > df.count > // this can be run 3 times no issue. > // then later cast to TimestampType{code} > doesn't have this problem and all 3 dataframes cache with correct times > displaying. > Running the code in shell and Jupyter or Zeppelin (ZP) also produces > different results. In the shell, you only get 1 unique time no matter how > many times you run it, current_timestamp. However, in ZP or Jupyter I have > always received 2 unique times before it froze. > > {code:java} > val df1 = spark.range(1).select(current_timestamp as "datetime").cache > df1.count > df1.show(false) > Thread.sleep(9500) > val df2 = spark.range(1).select(current_timestamp as "datetime").cache > df2.count > df2.show(false) > Thread.sleep(9500) > val df3 = spark.range(1).select(current_timestamp as "datetime").cache > df3.count > df3.show(false){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-32046) current_timestamp called in a cache dataframe freezes the time for all future calls
[ https://issues.apache.org/jira/browse/SPARK-32046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177401#comment-17177401 ] Dustin Smith edited comment on SPARK-32046 at 8/14/20, 12:51 AM: - [~maropu] the definition of current timestamp is as follows: {code:java} current_timestamp() - Returns the current timestamp at the start of query evaluation. {code} The question when does a query evaluation start and stop? Do mutual exclusive dataframes being processed consist of the same query evaluation? If yes, then current timestamp's behavior in spark shell is correct; however, as user, that would be extremely undesirable behavior. I would rather cache the current timestamp and call it again for a new time. Now if a query evaluation stops once it is executed and starts anew when another dataframe or action is called, then the behavior in shell and notebooks are incorrect. The notebooks are only correct for a few runs and then default to not changing. [https://spark.apache.org/docs/2.3.0/api/sql/index.html#current_timestamp] Additionally, whatever behavior is correct or should be correct is not consistent and more robust testing should occur in my opinion. As an after thought, the name current timestamp doesn't make sense if the time is supposed to freeze after one call. Really it is current timestamp once and beyond that call it is no longer current. was (Author: dustin.smith.tdg): [~maropu] the definition of current timestamp is as follows: {code:java} current_timestamp() - Returns the current timestamp at the start of query evaluation. {code} The question when does a query evaluation start and stop? Do mutual exclusive dataframes being processed consist of the same query evaluation? If yes, then current timestamp's behavior in spark shell is correct; however, as user, that would be extremely undesirable behavior. I would rather cache the current timestamp and call it again for a new time. Now if a query evaluation stops once it is executed and starts anew when another dataframe or action is called, then the behavior in shell and notebooks are incorrect. The notebooks are only correct for a few runs and then default to not changing. [https://spark.apache.org/docs/2.3.0/api/sql/index.html#current_timestamp] Additionally, whatever behavior is correct or should be correct is not consistent and more robust testing should occur in my opinion. > current_timestamp called in a cache dataframe freezes the time for all future > calls > --- > > Key: SPARK-32046 > URL: https://issues.apache.org/jira/browse/SPARK-32046 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.4, 3.0.0 >Reporter: Dustin Smith >Priority: Minor > Labels: caching, sql, time > > If I call current_timestamp 3 times while caching the dataframe variable in > order to freeze that dataframe's time, the 3rd dataframe time and beyond > (4th, 5th, ...) will be frozen to the 2nd dataframe's time. The 1st dataframe > and the 2nd will differ in time but will become static on the 3rd usage and > beyond (when running on Zeppelin or Jupyter). > Additionally, caching only caused 2 dataframes to cache skipping the 3rd. > However, > {code:java} > val df = Seq(java.time.LocalDateTime.now.toString).toDF("datetime").cache > df.count > // this can be run 3 times no issue. > // then later cast to TimestampType{code} > doesn't have this problem and all 3 dataframes cache with correct times > displaying. > Running the code in shell and Jupyter or Zeppelin (ZP) also produces > different results. In the shell, you only get 1 unique time no matter how > many times you run it, current_timestamp. However, in ZP or Jupyter I have > always received 2 unique times before it froze. > > {code:java} > val df1 = spark.range(1).select(current_timestamp as "datetime").cache > df1.count > df1.show(false) > Thread.sleep(9500) > val df2 = spark.range(1).select(current_timestamp as "datetime").cache > df2.count > df2.show(false) > Thread.sleep(9500) > val df3 = spark.range(1).select(current_timestamp as "datetime").cache > df3.count > df3.show(false){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-32046) current_timestamp called in a cache dataframe freezes the time for all future calls
[ https://issues.apache.org/jira/browse/SPARK-32046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177401#comment-17177401 ] Dustin Smith edited comment on SPARK-32046 at 8/14/20, 12:49 AM: - [~maropu] the definition of current timestamp is as follows: {code:java} current_timestamp() - Returns the current timestamp at the start of query evaluation. {code} The question when does a query evaluation start and stop? Do mutual exclusive dataframes being processed consist of the same query evaluation? If yes, then current timestamp's behavior in spark shell is correct; however, as user, that would be extremely undesirable behavior. I would rather cache the current timestamp and call it again for a new time. Now if a query evaluation stops once it is executed and starts anew when another dataframe or action is called, then the behavior in shell and notebooks are incorrect. The notebooks are only correct for a few runs and then default to not changing. [https://spark.apache.org/docs/2.3.0/api/sql/index.html#current_timestamp] Additionally, whatever behavior is correct or should be correct is not consistent and more robust testing should occur in my opinion. was (Author: dustin.smith.tdg): [~maropu] the definition of current timestamp is as follows: {code:java} current_timestamp() - Returns the current timestamp at the start of query evaluation. {code} The question when does a query evaluation start and stop? Do mutual exclusive dataframes being processed consist of the same query evaluation? If yes, then current timestamp's behavior in spark shell is correct; however, as user, that would be extremely undesirable behavior. I would rather cache the current timestamp and call it again for a new time. Now if a query evaluation stops once it is executed and starts anew when another dataframe or action is called, then the behavior in shell and notebooks are incorrect. The notebooks are only correct for a few runs and then default to not changing. [https://spark.apache.org/docs/2.3.0/api/sql/index.html#current_timestamp] > current_timestamp called in a cache dataframe freezes the time for all future > calls > --- > > Key: SPARK-32046 > URL: https://issues.apache.org/jira/browse/SPARK-32046 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.4, 3.0.0 >Reporter: Dustin Smith >Priority: Minor > Labels: caching, sql, time > > If I call current_timestamp 3 times while caching the dataframe variable in > order to freeze that dataframe's time, the 3rd dataframe time and beyond > (4th, 5th, ...) will be frozen to the 2nd dataframe's time. The 1st dataframe > and the 2nd will differ in time but will become static on the 3rd usage and > beyond (when running on Zeppelin or Jupyter). > Additionally, caching only caused 2 dataframes to cache skipping the 3rd. > However, > {code:java} > val df = Seq(java.time.LocalDateTime.now.toString).toDF("datetime").cache > df.count > // this can be run 3 times no issue. > // then later cast to TimestampType{code} > doesn't have this problem and all 3 dataframes cache with correct times > displaying. > Running the code in shell and Jupyter or Zeppelin (ZP) also produces > different results. In the shell, you only get 1 unique time no matter how > many times you run it, current_timestamp. However, in ZP or Jupyter I have > always received 2 unique times before it froze. > > {code:java} > val df1 = spark.range(1).select(current_timestamp as "datetime").cache > df1.count > df1.show(false) > Thread.sleep(9500) > val df2 = spark.range(1).select(current_timestamp as "datetime").cache > df2.count > df2.show(false) > Thread.sleep(9500) > val df3 = spark.range(1).select(current_timestamp as "datetime").cache > df3.count > df3.show(false){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org