Fix Spark streaming termination via waitUntilFinish and timeout config issue #39
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e10d5783 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e10d5783 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e10d5783 Branch: refs/heads/master Commit: e10d5783d8c8ed32008e29d99d5a4b1dd3e408a6 Parents: 8098bb1 Author: Etienne Chauchot <echauc...@gmail.com> Authored: Fri Apr 14 17:13:59 2017 +0200 Committer: Ismaël MejÃa <ieme...@gmail.com> Committed: Wed Aug 23 19:07:27 2017 +0200 ---------------------------------------------------------------------- .../nexmark/NexmarkConfiguration.java | 61 +++++++++++++++++--- .../integration/nexmark/NexmarkOptions.java | 7 +++ .../beam/integration/nexmark/NexmarkRunner.java | 2 +- 3 files changed, 62 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e10d5783/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java index d6cd808..1da08b4 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java @@ -106,6 +106,12 @@ public class NexmarkConfiguration implements Serializable { public int preloadSeconds = 0; /** + * Timeout for stream pipelines to stop in seconds. + */ + @JsonProperty + public int streamTimeout = 240; + + /** * If true, and in streaming mode, generate events only when they are due according to their * timestamp. */ @@ -275,6 +281,9 @@ public class NexmarkConfiguration implements Serializable { if (options.getPreloadSeconds() != null) { preloadSeconds = options.getPreloadSeconds(); } + if (options.getStreamTimeout() != null) { + streamTimeout = options.getStreamTimeout(); + } if (options.getIsRateLimited() != null) { isRateLimited = options.getIsRateLimited(); } @@ -368,6 +377,7 @@ public class NexmarkConfiguration implements Serializable { result.rateUnit = rateUnit; result.ratePeriodSec = ratePeriodSec; result.preloadSeconds = preloadSeconds; + result.streamTimeout = streamTimeout; result.isRateLimited = isRateLimited; result.useWallclockEventTime = useWallclockEventTime; result.avgPersonByteSize = avgPersonByteSize; @@ -436,6 +446,9 @@ public class NexmarkConfiguration implements Serializable { if (preloadSeconds != DEFAULT.preloadSeconds) { sb.append(String.format("; preloadSeconds:%d", preloadSeconds)); } + if (streamTimeout != DEFAULT.streamTimeout) { + sb.append(String.format("; streamTimeout:%d", streamTimeout)); + } if (isRateLimited != DEFAULT.isRateLimited) { sb.append(String.format("; isRateLimited:%s", isRateLimited)); } @@ -536,13 +549,44 @@ public class NexmarkConfiguration implements Serializable { @Override public int hashCode() { - return Objects.hash(debug, query, sourceType, sinkType, pubSubMode, - numEvents, numEventGenerators, rateShape, firstEventRate, nextEventRate, rateUnit, - ratePeriodSec, preloadSeconds, isRateLimited, useWallclockEventTime, avgPersonByteSize, - avgAuctionByteSize, avgBidByteSize, hotAuctionRatio, hotSellersRatio, hotBiddersRatio, - windowSizeSec, windowPeriodSec, watermarkHoldbackSec, numInFlightAuctions, numActivePeople, - coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout, maxAuctionsWaitingTime, - occasionalDelaySec, probDelayedEvent, maxLogEvents, usePubsubPublishTime, + return Objects.hash( + debug, + query, + sourceType, + sinkType, + pubSubMode, + numEvents, + numEventGenerators, + rateShape, + firstEventRate, + nextEventRate, + rateUnit, + ratePeriodSec, + preloadSeconds, + streamTimeout, + isRateLimited, + useWallclockEventTime, + avgPersonByteSize, + avgAuctionByteSize, + avgBidByteSize, + hotAuctionRatio, + hotSellersRatio, + hotBiddersRatio, + windowSizeSec, + windowPeriodSec, + watermarkHoldbackSec, + numInFlightAuctions, + numActivePeople, + coderStrategy, + cpuDelayMs, + diskBusyBytes, + auctionSkip, + fanout, + maxAuctionsWaitingTime, + occasionalDelaySec, + probDelayedEvent, + maxLogEvents, + usePubsubPublishTime, outOfOrderGroupSize); } @@ -630,6 +674,9 @@ public class NexmarkConfiguration implements Serializable { if (preloadSeconds != other.preloadSeconds) { return false; } + if (streamTimeout != other.streamTimeout) { + return false; + } if (Double.doubleToLongBits(probDelayedEvent) != Double.doubleToLongBits(other.probDelayedEvent)) { return false; http://git-wip-us.apache.org/repos/asf/beam/blob/e10d5783/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java index e39f0a4..5d093ae 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java @@ -110,6 +110,13 @@ public interface NexmarkOptions extends PubsubOptions { void setPreloadSeconds(Integer preloadSeconds); + @Description( + "Time in seconds to wait in pipelineResult.waitUntilFinish(), useful in streaming mode") + @Nullable + Integer getStreamTimeout(); + + void setStreamTimeout(Integer preloadSeconds); + @Description("Number of unbounded sources to create events.") @Nullable Integer getNumEventGenerators(); http://git-wip-us.apache.org/repos/asf/beam/blob/e10d5783/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java index 3a0452f..ef5f0e2 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java @@ -1230,7 +1230,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> { waitForPublisherPreload(); } mainResult = p.run(); - mainResult.waitUntilFinish(); + mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout)); return monitor(query); } finally { //