Improve NexmarkUtils: improve diskBusy() and remove unneeded randomization code
- Use state API in NexmarkUtils.diskBusy() - Remove commented code for direct runner randomization disabling: direct runner no more allows disabling randomization and queries and UT pass Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c116709 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c116709 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c116709 Branch: refs/heads/master Commit: 6c116709fff06f7faa491a090f441f618931d256 Parents: ee500b2 Author: Etienne Chauchot <echauc...@gmail.com> Authored: Tue May 30 18:00:00 2017 +0100 Committer: Ismaël MejÃa <ieme...@gmail.com> Committed: Wed Aug 23 19:07:29 2017 +0200 ---------------------------------------------------------------------- .../beam/integration/nexmark/NexmarkUtils.java | 87 ++++++++++++-------- .../nexmark/queries/NexmarkQuery.java | 2 +- 2 files changed, 52 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6c116709/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java index 7707429..7926690 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java @@ -18,14 +18,12 @@ package org.apache.beam.integration.nexmark; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableList; import com.google.common.hash.Hashing; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Iterator; -import java.util.List; import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.AuctionBid; import org.apache.beam.integration.nexmark.model.AuctionCount; @@ -66,6 +64,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; @@ -289,8 +288,8 @@ public class NexmarkUtils { private static final boolean LOG_ERROR = true; /** - * Set to true to log directly to stdout. If run using Google Dataflow, you can watch the results in real-time with: - * tail -f /var/log/dataflow/streaming-harness/harness-stdout.log + * Set to true to log directly to stdout. If run using Google Dataflow, you can watch the results + * in real-time with: tail -f /var/log/dataflow/streaming-harness/harness-stdout.log */ private static final boolean LOG_TO_CONSOLE = false; @@ -340,14 +339,6 @@ public class NexmarkUtils { * Setup pipeline with codes and some other options. */ public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) { - //TODO Ismael check -// PipelineRunner<?> runner = p.getRunner(); -// if (runner instanceof DirectRunner) { -// // Disable randomization of output since we want to check batch and streaming match the -// // model both locally and on the cloud. -// ((DirectRunner) runner).withUnorderednessTesting(false); -// } - CoderRegistry registry = p.getCoderRegistry(); switch (coderStrategy) { case HAND: @@ -565,35 +556,59 @@ public class NexmarkUtils { }); } - private static final StateSpec<ValueState<byte[]>> DUMMY_TAG = - StateSpecs.value(ByteArrayCoder.of()); private static final int MAX_BUFFER_SIZE = 1 << 24; + private static class DiskBusyTransform<T> extends PTransform<PCollection<T>, PCollection<T>>{ + + private long bytes; + + private DiskBusyTransform(long bytes) { + this.bytes = bytes; + } + + @Override public PCollection<T> expand(PCollection<T> input) { + // Add dummy key to be able to use State API + PCollection<KV<Integer, T>> kvCollection = input.apply("diskBusy.keyElements", ParDo.of(new DoFn<T, KV<Integer, T>>() { + + @ProcessElement public void processElement(ProcessContext context) { + context.output(KV.of(0, context.element())); + } + })); + // Apply actual transform that generates disk IO using state API + PCollection<T> output = kvCollection.apply("diskBusy.generateIO", ParDo.of(new DoFn<KV<Integer, T>, T>() { + + private static final String DISK_BUSY = "diskBusy"; + + @StateId(DISK_BUSY) private final StateSpec<ValueState<byte[]>> spec = StateSpecs + .value(ByteArrayCoder.of()); + + @ProcessElement public void processElement(ProcessContext c, + @StateId(DISK_BUSY) ValueState<byte[]> state) { + long remain = bytes; + long now = System.currentTimeMillis(); + while (remain > 0) { + long thisBytes = Math.min(remain, MAX_BUFFER_SIZE); + remain -= thisBytes; + byte[] arr = new byte[(int) thisBytes]; + for (int i = 0; i < thisBytes; i++) { + arr[i] = (byte) now; + } + state.write(arr); + now = System.currentTimeMillis(); + } + c.output(c.element().getValue()); + } + })); + return output; + } + } + + /** * Return a transform to write given number of bytes to durable store on every record. */ - public static <T> ParDo.SingleOutput<T, T> diskBusy(String name, final long bytes) { - return ParDo.of(new DoFn<T, T>() { - @ProcessElement - public void processElement(ProcessContext c) { - long remain = bytes; -// long now = System.currentTimeMillis(); - while (remain > 0) { - //TODO Ismael google on state - long thisBytes = Math.min(remain, MAX_BUFFER_SIZE); - remain -= thisBytes; -// byte[] arr = new byte[(int) thisBytes]; -// for (int i = 0; i < thisBytes; i++) { -// arr[i] = (byte) now; -// } -// ValueState<byte[]> state = c.windowingInternals().stateInternals().state( -// StateNamespaces.global(), DUMMY_TAG); -// state.write(arr); -// now = System.currentTimeMillis(); - } - c.output(c.element()); - } - }); + public static <T> PTransform<PCollection<T>, PCollection<T>> diskBusy(final long bytes) { + return new DiskBusyTransform<>(bytes); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/6c116709/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java index 09415c0..8b74282 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java @@ -253,7 +253,7 @@ public abstract class NexmarkQuery if (configuration.diskBusyBytes > 0) { // Slow down by forcing bytes to durable store. events = events.apply(name + ".DiskBusy", - NexmarkUtils.<Event>diskBusy(name, configuration.diskBusyBytes)); + NexmarkUtils.<Event>diskBusy(configuration.diskBusyBytes)); } // Run the query.