Tathagata Das created SPARK-20374: ------------------------------------- Summary: Encoder generated using Java beans causes corruption in MapGroupsWithState Key: SPARK-20374 URL: https://issues.apache.org/jira/browse/SPARK-20374 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.2.0 Reporter: Tathagata Das
Running the example https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java gives incorrect output. Specifically, for every input word, it outputs the word twice in update mode, which is incorrect. This happens because in MapGroupsWithStateExec, when the timeout timestamp is written to the state row, it gets corrupdated. This leads to 1. The state is updated, hence the word is output once. 2. Later, when the timed out states are being processed, the same word is found again because of the corrupdated timeout timestamp. Therefore Ideally, the state row whose timeout timestamp was updated with T should never get caught in the search for timed out keys (i.e. timeout timestamp < T). But the corruption is returning a different timeout timestamp in the search. Finally this must be a java bean encoder issue because the exact same query in the Scala example works fine - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org