Repository: spark Updated Branches: refs/heads/branch-2.2 7dbc0a910 -> 6a25d391f
[SPARK-20377][SS] Fix JavaStructuredSessionization example ## What changes were proposed in this pull request? Extra accessors in java bean class causes incorrect encoder generation, which corrupted the state when using timeouts. ## How was this patch tested? manually ran the example Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #17676 from tdas/SPARK-20377. (cherry picked from commit 74aa0df8f7f132b62754e5159262e4a5b9b641ab) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a25d391 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a25d391 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a25d391 Branch: refs/heads/branch-2.2 Commit: 6a25d391f81eed5f23e105c2db427ae8bb032752 Parents: 7dbc0a9 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Tue Apr 18 16:10:40 2017 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Tue Apr 18 16:10:51 2017 -0700 ---------------------------------------------------------------------- .../sql/streaming/JavaStructuredSessionization.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6a25d391/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java index da3a5df..d3c8516 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java @@ -76,8 +76,6 @@ public final class JavaStructuredSessionization { for (String word : lineWithTimestamp.getLine().split(" ")) { eventList.add(new Event(word, lineWithTimestamp.getTimestamp())); } - System.out.println( - "Number of events from " + lineWithTimestamp.getLine() + " = " + eventList.size()); return eventList.iterator(); } }; @@ -100,7 +98,7 @@ public final class JavaStructuredSessionization { // If timed out, then remove session and send final update if (state.hasTimedOut()) { SessionUpdate finalUpdate = new SessionUpdate( - sessionId, state.get().getDurationMs(), state.get().getNumEvents(), true); + sessionId, state.get().calculateDuration(), state.get().getNumEvents(), true); state.remove(); return finalUpdate; @@ -133,7 +131,7 @@ public final class JavaStructuredSessionization { // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("10 seconds"); return new SessionUpdate( - sessionId, state.get().getDurationMs(), state.get().getNumEvents(), false); + sessionId, state.get().calculateDuration(), state.get().getNumEvents(), false); } } }; @@ -215,7 +213,8 @@ public final class JavaStructuredSessionization { public long getEndTimestampMs() { return endTimestampMs; } public void setEndTimestampMs(long endTimestampMs) { this.endTimestampMs = endTimestampMs; } - public long getDurationMs() { return endTimestampMs - startTimestampMs; } + public long calculateDuration() { return endTimestampMs - startTimestampMs; } + @Override public String toString() { return "SessionInfo(numEvents = " + numEvents + ", timestamps = " + startTimestampMs + " to " + endTimestampMs + ")"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org