Fix static analysis issues Restrict access level on classes + other static analysis fixes
Fix findbugs issues (issue #33) Fix compile after AvroIO, TextIO, PubsubIO and State refactor Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1541fad0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1541fad0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1541fad0 Branch: refs/heads/master Commit: 1541fad077e47df1d47636fd186a72aa827bbc42 Parents: a39cb80 Author: Ismaël MejÃa <ieme...@apache.org> Authored: Mon May 1 00:54:08 2017 +0200 Committer: Ismaël MejÃa <ieme...@gmail.com> Committed: Wed Aug 23 19:07:28 2017 +0200 ---------------------------------------------------------------------- integration/java/nexmark/pom.xml | 2 +- .../beam/integration/nexmark/Monitor.java | 4 +- .../beam/integration/nexmark/NexmarkDriver.java | 12 +- .../beam/integration/nexmark/NexmarkRunner.java | 124 +++++++++++-------- .../beam/integration/nexmark/NexmarkUtils.java | 34 +++-- .../beam/integration/nexmark/model/Auction.java | 8 +- .../integration/nexmark/model/AuctionCount.java | 6 +- .../integration/nexmark/model/AuctionPrice.java | 4 +- .../nexmark/model/BidsPerSession.java | 4 +- .../beam/integration/nexmark/model/Done.java | 2 +- .../beam/integration/nexmark/model/Event.java | 13 -- .../nexmark/model/IdNameReserve.java | 6 +- .../nexmark/model/NameCityStateId.java | 8 +- .../beam/integration/nexmark/model/Person.java | 6 +- .../integration/nexmark/model/SellerPrice.java | 2 +- .../nexmark/queries/AbstractSimulator.java | 10 +- .../nexmark/queries/NexmarkQuery.java | 34 ++--- .../nexmark/queries/NexmarkQueryModel.java | 17 +-- .../nexmark/queries/Query0Model.java | 2 +- .../integration/nexmark/queries/Query10.java | 6 +- .../integration/nexmark/queries/Query11.java | 3 +- .../nexmark/queries/Query1Model.java | 2 +- .../integration/nexmark/queries/Query3.java | 24 ++-- .../nexmark/queries/Query3Model.java | 2 +- .../nexmark/queries/Query4Model.java | 5 +- .../integration/nexmark/queries/Query5.java | 4 +- .../integration/nexmark/queries/Query6.java | 4 +- .../nexmark/queries/Query6Model.java | 5 +- .../nexmark/queries/WinningBids.java | 30 +++-- .../integration/nexmark/sources/Generator.java | 11 +- .../nexmark/sources/GeneratorConfig.java | 26 ++-- .../nexmark/sources/UnboundedEventSource.java | 2 +- .../sources/UnboundedEventSourceTest.java | 5 +- integration/pom.xml | 14 +++ 34 files changed, 221 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml index fb213e9..8a65c0f 100644 --- a/integration/java/nexmark/pom.xml +++ b/integration/java/nexmark/pom.xml @@ -210,7 +210,7 @@ <dependency> <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java index cb4d71c..2f0c56a 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java @@ -63,8 +63,8 @@ public class Monitor<T extends KnownSize> implements Serializable { public final String name; public final String prefix; - final MonitorDoFn doFn; - final PTransform<PCollection<? extends T>, PCollection<T>> transform; + private final MonitorDoFn doFn; + private final PTransform<PCollection<? extends T>, PCollection<T>> transform; public Monitor(String name, String prefix) { this.name = name; http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java index 7d532cc..a982a8d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java @@ -57,7 +57,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> { /** * Entry point. */ - public void runAll(OptionT options, NexmarkRunner runner) { + void runAll(OptionT options, NexmarkRunner runner) { Instant start = Instant.now(); Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename()); Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>(); @@ -87,7 +87,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> { } if (!successful) { - System.exit(1); + throw new RuntimeException("Execution was not successful"); } } @@ -149,8 +149,6 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> { /** * Print summary of {@code actual} vs (if non-null) {@code baseline}. - * - * @throws IOException */ private static void saveSummary( @Nullable String summaryFilename, @@ -227,7 +225,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> { if (actualPerf != null) { List<String> errors = actualPerf.errors; if (errors == null) { - errors = new ArrayList<String>(); + errors = new ArrayList<>(); errors.add("NexmarkGoogleRunner returned null errors list"); } for (String error : errors) { @@ -300,7 +298,7 @@ public class NexmarkDriver<OptionT extends NexmarkOptions> { NexmarkOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(NexmarkOptions.class); - NexmarkRunner runner = new NexmarkRunner(options); - new NexmarkDriver().runAll(options, runner); + NexmarkRunner<NexmarkOptions> runner = new NexmarkRunner<>(options); + new NexmarkDriver<>().runAll(options, runner); } } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java index a3c4d33..6df76f0 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.ThreadLocalRandom; @@ -65,10 +66,12 @@ import org.apache.beam.integration.nexmark.queries.Query9; import org.apache.beam.integration.nexmark.queries.Query9Model; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; @@ -77,6 +80,7 @@ import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TimestampedValue; @@ -91,15 +95,15 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { /** * Minimum number of samples needed for 'stead-state' rate calculation. */ - protected static final int MIN_SAMPLES = 9; + private static final int MIN_SAMPLES = 9; /** * Minimum length of time over which to consider samples for 'steady-state' rate calculation. */ - protected static final Duration MIN_WINDOW = Duration.standardMinutes(2); + private static final Duration MIN_WINDOW = Duration.standardMinutes(2); /** * Delay between perf samples. */ - protected static final Duration PERF_DELAY = Duration.standardSeconds(15); + private static final Duration PERF_DELAY = Duration.standardSeconds(15); /** * How long to let streaming pipeline run after all events have been generated and we've * seen no activity. @@ -117,37 +121,37 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { /** * NexmarkOptions shared by all runs. */ - protected final OptionT options; + private final OptionT options; /** * Which configuration we are running. */ @Nullable - protected NexmarkConfiguration configuration; + private NexmarkConfiguration configuration; /** * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null. */ @Nullable - protected Monitor<Event> publisherMonitor; + private Monitor<Event> publisherMonitor; /** * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null. */ @Nullable - protected PipelineResult publisherResult; + private PipelineResult publisherResult; /** * Result for the main pipeline. */ @Nullable - protected PipelineResult mainResult; + private PipelineResult mainResult; /** * Query name we are running. */ @Nullable - protected String queryName; + private String queryName; public NexmarkRunner(OptionT options) { this.options = options; @@ -160,7 +164,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { /** * Is this query running in streaming mode? */ - protected boolean isStreaming() { + private boolean isStreaming() { return options.isStreaming(); } @@ -174,7 +178,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { /** * Return maximum number of workers. */ - protected int maxNumWorkers() { + private int maxNumWorkers() { return 5; } @@ -182,7 +186,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { * Return the current value for a long counter, or a default value if can't be retrieved. * Note this uses only attempted metrics because some runners don't support committed metrics. */ - protected long getCounterMetric(PipelineResult result, String namespace, String name, + private long getCounterMetric(PipelineResult result, String namespace, String name, long defaultValue) { //TODO Ismael calc this only once MetricQueryResults metrics = result.metrics().queryMetrics( @@ -201,7 +205,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { * Return the current value for a long counter, or a default value if can't be retrieved. * Note this uses only attempted metrics because some runners don't support committed metrics. */ - protected long getDistributionMetric(PipelineResult result, String namespace, String name, + private long getDistributionMetric(PipelineResult result, String namespace, String name, DistributionType distType, long defaultValue) { MetricQueryResults metrics = result.metrics().queryMetrics( MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); @@ -226,7 +230,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { /** * Return the current value for a time counter, or -1 if can't be retrieved. */ - protected long getTimestampMetric(long now, long value) { + private long getTimestampMetric(long now, long value) { //TODO Ismael improve doc if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) { return -1; @@ -238,8 +242,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { * Find a 'steady state' events/sec from {@code snapshots} and * store it in {@code perf} if found. */ - protected void captureSteadyState(NexmarkPerf perf, - List<NexmarkPerf.ProgressSnapshot> snapshots) { + private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) { if (!options.isStreaming()) { return; } @@ -426,7 +429,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { return perf; } - protected String getJobId(PipelineResult job) { + private String getJobId(PipelineResult job) { return ""; } @@ -528,15 +531,14 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { /** * Build and run a pipeline using specified options. */ - protected interface PipelineBuilder<OptionT extends NexmarkOptions> { + interface PipelineBuilder<OptionT extends NexmarkOptions> { void build(OptionT publishOnlyOptions); } /** * Invoke the builder with options suitable for running a publish-only child pipeline. */ - protected void invokeBuilderForPublishOnlyPipeline( - PipelineBuilder builder) { + private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) { builder.build(options); // throw new UnsupportedOperationException( // "Cannot use --pubSubMode=COMBINED with DirectRunner"); @@ -546,7 +548,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { * If monitoring, wait until the publisher pipeline has run long enough to establish * a backlog on the Pubsub topic. Otherwise, return immediately. */ - protected void waitForPublisherPreload() { + private void waitForPublisherPreload() { throw new UnsupportedOperationException(); } @@ -555,7 +557,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { * it was measured. */ @Nullable - protected NexmarkPerf monitor(NexmarkQuery query) { + private NexmarkPerf monitor(NexmarkQuery query) { if (!options.getMonitorJobs()) { return null; } @@ -841,14 +843,28 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) { String shortSubscription = shortSubscription(now); NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription); - PubsubIO.Read<Event> io = - PubsubIO.<Event>read().fromSubscription(shortSubscription) - .withIdAttribute(NexmarkUtils.PUBSUB_ID) - .withCoder(Event.CODER); + + PubsubIO.Read<PubsubMessage> io = + PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(shortSubscription) + .withIdAttribute(NexmarkUtils.PUBSUB_ID); if (!configuration.usePubsubPublishTime) { io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); } - return p.apply(queryName + ".ReadPubsubEvents", io); + + return p + .apply(queryName + ".ReadPubsubEvents", io) + .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() { + @ProcessElement + public void processElement(ProcessContext c) { + byte[] payload = c.element().getPayload(); + try { + Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload); + c.output(event); + } catch (CoderException e) { + // TODO Log decoding Event error + } + } + })); } /** @@ -861,9 +877,8 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { } NexmarkUtils.console("Reading events from Avro files at %s", filename); return p - .apply(queryName + ".ReadAvroEvents", AvroIO.Read - .from(filename + "*.avro") - .withSchema(Event.class)) + .apply(queryName + ".ReadAvroEvents", AvroIO.read(Event.class) + .from(filename + "*.avro")) .apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA); } @@ -873,14 +888,28 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { private void sinkEventsToPubsub(PCollection<Event> events, long now) { String shortTopic = shortTopic(now); NexmarkUtils.console("Writing events to Pubsub %s", shortTopic); - PubsubIO.Write<Event> io = - PubsubIO.<Event>write().to(shortTopic) - .withIdAttribute(NexmarkUtils.PUBSUB_ID) - .withCoder(Event.CODER); + + PubsubIO.Write<PubsubMessage> io = + PubsubIO.writePubsubMessages().to(shortTopic) + .withIdAttribute(NexmarkUtils.PUBSUB_ID); if (!configuration.usePubsubPublishTime) { io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); } - events.apply(queryName + ".WritePubsubEvents", io); + + events.apply(queryName + ".EventToPubsubMessage", + ParDo.of(new DoFn<Event, PubsubMessage>() { + @ProcessElement + public void processElement(ProcessContext c) { + try { + byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element()); + c.output(new PubsubMessage(payload, new HashMap<String, String>())); + } catch (CoderException e1) { + // TODO Log encoding Event error + } + } + }) + ) + .apply(queryName + ".WritePubsubEvents", io); } /** @@ -890,7 +919,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { String shortTopic = shortTopic(now); NexmarkUtils.console("Writing results to Pubsub %s", shortTopic); PubsubIO.Write<String> io = - PubsubIO.<String>write().to(shortTopic) + PubsubIO.writeStrings().to(shortTopic) .withIdAttribute(NexmarkUtils.PUBSUB_ID); if (!configuration.usePubsubPublishTime) { io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); @@ -917,18 +946,16 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { } NexmarkUtils.console("Writing events to Avro files at %s", filename); source.apply(queryName + ".WriteAvroEvents", - AvroIO.Write.to(filename + "/event").withSuffix(".avro").withSchema(Event.class)); + AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro")); source.apply(NexmarkQuery.JUST_BIDS) .apply(queryName + ".WriteAvroBids", - AvroIO.Write.to(filename + "/bid").withSuffix(".avro").withSchema(Bid.class)); + AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro")); source.apply(NexmarkQuery.JUST_NEW_AUCTIONS) .apply(queryName + ".WriteAvroAuctions", - AvroIO.Write.to(filename + "/auction").withSuffix(".avro") - .withSchema(Auction.class)); + AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro")); source.apply(NexmarkQuery.JUST_NEW_PERSONS) .apply(queryName + ".WriteAvroPeople", - AvroIO.Write.to(filename + "/person").withSuffix(".avro") - .withSchema(Person.class)); + AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro")); } /** @@ -938,7 +965,7 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { String filename = textFilename(now); NexmarkUtils.console("Writing results to text files at %s", filename); formattedResults.apply(queryName + ".WriteTextResults", - TextIO.Write.to(filename)); + TextIO.write().to(filename)); } private static class StringToTableRow extends DoFn<String, TableRow> { @@ -1010,12 +1037,12 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { // Send synthesized events to Pubsub in separate publisher job. // We won't start the main pipeline until the publisher has sent the pre-load events. // We'll shutdown the publisher job when we notice the main job has finished. - invokeBuilderForPublishOnlyPipeline(new PipelineBuilder() { + invokeBuilderForPublishOnlyPipeline(new PipelineBuilder<NexmarkOptions>() { @Override public void build(NexmarkOptions publishOnlyOptions) { Pipeline sp = Pipeline.create(options); NexmarkUtils.setupPipeline(configuration.coderStrategy, sp); - publisherMonitor = new Monitor<Event>(queryName, "publisher"); + publisherMonitor = new Monitor<>(queryName, "publisher"); sinkEventsToPubsub( sourceEventsFromSynthetic(sp) .apply(queryName + ".Monitor", publisherMonitor.getTransform()), @@ -1140,9 +1167,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { checkState(queryName == null); configuration = runConfiguration; - // GCS URI patterns to delete on exit. - List<String> pathsToDelete = new ArrayList<>(); - try { NexmarkUtils.console("Running %s", configuration.toShortString()); @@ -1220,9 +1244,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { } ((Query10) query).setOutputPath(path); ((Query10) query).setMaxNumWorkers(maxNumWorkers()); - if (path != null && options.getManageResources()) { - pathsToDelete.add(path + "/**"); - } } // Apply query. @@ -1252,7 +1273,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> { } finally { configuration = null; queryName = null; - // TODO: Cleanup pathsToDelete } } } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java index 18589c4..f6215e9 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java @@ -55,6 +55,9 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -63,9 +66,6 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; @@ -178,7 +178,7 @@ public class NexmarkUtils { /** Names are suffixed with the query being run. */ QUERY, /** Names are suffixed with the query being run and a random number. */ - QUERY_AND_SALT; + QUERY_AND_SALT } /** @@ -310,7 +310,7 @@ public class NexmarkUtils { * Log message to console. For client side only. */ public static void console(String format, Object... args) { - System.out.printf("%s %s\n", Instant.now(), String.format(format, args)); + System.out.printf("%s %s%n", Instant.now(), String.format(format, args)); } /** @@ -326,7 +326,7 @@ public class NexmarkUtils { /** * All events will be given a timestamp relative to this time (ms since epoch). */ - public static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis(); + private static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis(); /** * Instants guaranteed to be strictly before and after all event timestamps, and which won't @@ -377,7 +377,7 @@ public class NexmarkUtils { /** * Return a generator config to match the given {@code options}. */ - public static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) { + private static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) { return new GeneratorConfig(configuration, configuration.useWallclockEventTime ? System.currentTimeMillis() : BASE_TIME, 0, @@ -558,15 +558,14 @@ public class NexmarkUtils { } p++; } - long next = System.currentTimeMillis(); - now = next; + now = System.currentTimeMillis(); } c.output(c.element()); } }); } - private static final StateSpec<Object, ValueState<byte[]>> DUMMY_TAG = + private static final StateSpec<ValueState<byte[]>> DUMMY_TAG = StateSpecs.value(ByteArrayCoder.of()); private static final int MAX_BUFFER_SIZE = 1 << 24; @@ -578,20 +577,19 @@ public class NexmarkUtils { @ProcessElement public void processElement(ProcessContext c) { long remain = bytes; - long start = System.currentTimeMillis(); - long now = start; +// long now = System.currentTimeMillis(); while (remain > 0) { + //TODO Ismael google on state long thisBytes = Math.min(remain, MAX_BUFFER_SIZE); remain -= thisBytes; - byte[] arr = new byte[(int) thisBytes]; - for (int i = 0; i < thisBytes; i++) { - arr[i] = (byte) now; - } - //TODO Ismael google on state +// byte[] arr = new byte[(int) thisBytes]; +// for (int i = 0; i < thisBytes; i++) { +// arr[i] = (byte) now; +// } // ValueState<byte[]> state = c.windowingInternals().stateInternals().state( // StateNamespaces.global(), DUMMY_TAG); // state.write(arr); - now = System.currentTimeMillis(); +// now = System.currentTimeMillis(); } c.output(c.element()); } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java index 4b1a848..5c018dc 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java @@ -81,14 +81,14 @@ public class Auction implements KnownSize, Serializable { /** Extra auction properties. */ @JsonProperty - public final String itemName; + private final String itemName; @JsonProperty - public final String description; + private final String description; /** Initial bid price, in cents. */ @JsonProperty - public final long initialBid; + private final long initialBid; /** Reserve price, in cents. */ @JsonProperty @@ -111,7 +111,7 @@ public class Auction implements KnownSize, Serializable { /** Additional arbitrary payload for performance testing. */ @JsonProperty - public final String extra; + private final String extra; // For Avro only. http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java index e6d3450..c83a455 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java @@ -54,11 +54,9 @@ public class AuctionCount implements KnownSize, Serializable { } }; - @JsonProperty - public final long auction; + @JsonProperty private final long auction; - @JsonProperty - public final long count; + @JsonProperty private final long count; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java index cb971e2..43d0b27 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java @@ -55,11 +55,11 @@ public class AuctionPrice implements KnownSize, Serializable { }; @JsonProperty - public final long auction; + private final long auction; /** Price in cents. */ @JsonProperty - public final long price; + private final long price; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java index 26b6a41..6dddf34 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java @@ -56,10 +56,10 @@ public class BidsPerSession implements KnownSize, Serializable { }; @JsonProperty - public final long personId; + private final long personId; @JsonProperty - public final long bidsPerSession; + private final long bidsPerSession; public BidsPerSession() { personId = 0; http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java index 42999cd..0c14e8f 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java @@ -54,7 +54,7 @@ public class Done implements KnownSize, Serializable { }; @JsonProperty - public final String message; + private final String message; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java index e2130c9..1f1f096 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java @@ -138,19 +138,6 @@ public class Event implements KnownSize, Serializable { } } - /** - * Remove {@code annotation} from event. (Used for debugging.) - */ - public Event withoutAnnotation(String annotation) { - if (newPerson != null) { - return new Event(newPerson.withoutAnnotation(annotation)); - } else if (newAuction != null) { - return new Event(newAuction.withoutAnnotation(annotation)); - } else { - return new Event(bid.withoutAnnotation(annotation)); - } - } - @Override public long sizeInBytes() { if (newPerson != null) { http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java index cf1e571..17b8c4a 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java @@ -60,14 +60,14 @@ public class IdNameReserve implements KnownSize, Serializable { }; @JsonProperty - public final long id; + private final long id; @JsonProperty - public final String name; + private final String name; /** Reserve price in cents. */ @JsonProperty - public final long reserve; + private final long reserve; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java index 86d1738..28f25cd 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java @@ -62,16 +62,16 @@ public class NameCityStateId implements KnownSize, Serializable { }; @JsonProperty - public final String name; + private final String name; @JsonProperty - public final String city; + private final String city; @JsonProperty - public final String state; + private final String state; @JsonProperty - public final long id; + private final long id; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java index 906df94..c690fd4 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java @@ -77,10 +77,10 @@ public class Person implements KnownSize, Serializable { public final String name; @JsonProperty - public final String emailAddress; + private final String emailAddress; @JsonProperty - public final String creditCard; + private final String creditCard; @JsonProperty public final String city; @@ -93,7 +93,7 @@ public class Person implements KnownSize, Serializable { /** Additional arbitrary payload for performance testing. */ @JsonProperty - public final String extra; + private final String extra; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java index 68f2697..52ff540 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java @@ -60,7 +60,7 @@ public class SellerPrice implements KnownSize, Serializable { /** Price in cents. */ @JsonProperty - public final long price; + private final long price; // For Avro only. @SuppressWarnings("unused") http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java index 270b5c3..1395182 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java @@ -37,7 +37,7 @@ import org.joda.time.Instant; */ public abstract class AbstractSimulator<InputT, OutputT> { /** Window size for action bucket sampling. */ - public static final Duration WINDOW_SIZE = Duration.standardMinutes(1); + private static final Duration WINDOW_SIZE = Duration.standardMinutes(1); /** Input event stream we should draw from. */ private final Iterator<TimestampedValue<InputT>> input; @@ -77,7 +77,7 @@ public abstract class AbstractSimulator<InputT, OutputT> { /** Called by implementors of {@link #run}: Fetch the next input element. */ @Nullable - protected TimestampedValue<InputT> nextInput() { + TimestampedValue<InputT> nextInput() { if (!input.hasNext()) { return null; } @@ -90,7 +90,7 @@ public abstract class AbstractSimulator<InputT, OutputT> { * Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of * recording the expected activity of the query over time. */ - protected void addIntermediateResult(TimestampedValue<OutputT> result) { + void addIntermediateResult(TimestampedValue<OutputT> result) { NexmarkUtils.info("intermediate result: %s", result); updateCounts(result.getTimestamp()); } @@ -99,7 +99,7 @@ public abstract class AbstractSimulator<InputT, OutputT> { * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking * semantic correctness. */ - protected void addResult(TimestampedValue<OutputT> result) { + void addResult(TimestampedValue<OutputT> result) { NexmarkUtils.info("result: %s", result); pendingResults.add(result); updateCounts(result.getTimestamp()); @@ -121,7 +121,7 @@ public abstract class AbstractSimulator<InputT, OutputT> { } /** Called by implementors of {@link #run}: Record that no more results will be emitted. */ - protected void allDone() { + void allDone() { isDone = true; } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java index 0796ce5..09415c0 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java @@ -46,10 +46,10 @@ public abstract class NexmarkQuery extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> { public static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions"); public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids"); - protected static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person"); + static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person"); /** Predicate to detect a new person event. */ - protected static final SerializableFunction<Event, Boolean> IS_NEW_PERSON = + private static final SerializableFunction<Event, Boolean> IS_NEW_PERSON = new SerializableFunction<Event, Boolean>() { @Override public Boolean apply(Event event) { @@ -58,7 +58,7 @@ public abstract class NexmarkQuery }; /** DoFn to convert a new person event to a person. */ - protected static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() { + private static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().newPerson); @@ -66,7 +66,7 @@ public abstract class NexmarkQuery }; /** Predicate to detect a new auction event. */ - protected static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION = + private static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION = new SerializableFunction<Event, Boolean>() { @Override public Boolean apply(Event event) { @@ -75,7 +75,7 @@ public abstract class NexmarkQuery }; /** DoFn to convert a new auction event to an auction. */ - protected static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() { + private static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().newAuction); @@ -83,7 +83,7 @@ public abstract class NexmarkQuery }; /** Predicate to detect a new bid event. */ - protected static final SerializableFunction<Event, Boolean> IS_BID = + private static final SerializableFunction<Event, Boolean> IS_BID = new SerializableFunction<Event, Boolean>() { @Override public Boolean apply(Event event) { @@ -92,7 +92,7 @@ public abstract class NexmarkQuery }; /** DoFn to convert a bid event to a bid. */ - protected static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() { + private static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().bid); @@ -100,7 +100,7 @@ public abstract class NexmarkQuery }; /** Transform to key each person by their id. */ - protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID = + static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID = ParDo.of(new DoFn<Person, KV<Long, Person>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -109,7 +109,7 @@ public abstract class NexmarkQuery }); /** Transform to key each auction by its id. */ - protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID = + static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID = ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -118,7 +118,7 @@ public abstract class NexmarkQuery }); /** Transform to key each auction by its seller id. */ - protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER = + static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER = ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -127,7 +127,7 @@ public abstract class NexmarkQuery }); /** Transform to key each bid by it's auction id. */ - protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION = + static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION = ParDo.of(new DoFn<Bid, KV<Long, Bid>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -136,7 +136,7 @@ public abstract class NexmarkQuery }); /** Transform to project the auction id from each bid. */ - protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION = + static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION = ParDo.of(new DoFn<Bid, Long>() { @ProcessElement public void processElement(ProcessContext c) { @@ -145,7 +145,7 @@ public abstract class NexmarkQuery }); /** Transform to project the price from each bid. */ - protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE = + static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE = ParDo.of(new DoFn<Bid, Long>() { @ProcessElement public void processElement(ProcessContext c) { @@ -205,13 +205,13 @@ public abstract class NexmarkQuery } }; - protected final NexmarkConfiguration configuration; + final NexmarkConfiguration configuration; public final Monitor<Event> eventMonitor; public final Monitor<KnownSize> resultMonitor; - public final Monitor<Event> endOfStreamMonitor; - protected final Counter fatalCounter; + private final Monitor<Event> endOfStreamMonitor; + private final Counter fatalCounter; - protected NexmarkQuery(NexmarkConfiguration configuration, String name) { + NexmarkQuery(NexmarkConfiguration configuration, String name) { super(name); this.configuration = configuration; if (configuration.debug) { http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java index 1ad9099..bfa668b 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java @@ -43,7 +43,7 @@ import org.junit.Assert; public abstract class NexmarkQueryModel implements Serializable { public final NexmarkConfiguration configuration; - public NexmarkQueryModel(NexmarkConfiguration configuration) { + NexmarkQueryModel(NexmarkConfiguration configuration) { this.configuration = configuration; } @@ -51,7 +51,7 @@ public abstract class NexmarkQueryModel implements Serializable { * Return the start of the most recent window of {@code size} and {@code period} which ends * strictly before {@code timestamp}. */ - public static Instant windowStart(Duration size, Duration period, Instant timestamp) { + static Instant windowStart(Duration size, Duration period, Instant timestamp) { long ts = timestamp.getMillis(); long p = period.getMillis(); long lim = ts - ts % p; @@ -60,7 +60,7 @@ public abstract class NexmarkQueryModel implements Serializable { } /** Convert {@code itr} to strings capturing values, timestamps and order. */ - protected static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) { + static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) { List<String> strings = new ArrayList<>(); while (itr.hasNext()) { strings.add(itr.next().toString()); @@ -69,7 +69,7 @@ public abstract class NexmarkQueryModel implements Serializable { } /** Convert {@code itr} to strings capturing values and order. */ - protected static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) { + static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) { List<String> strings = new ArrayList<>(); while (itr.hasNext()) { strings.add(itr.next().getValue().toString()); @@ -78,7 +78,7 @@ public abstract class NexmarkQueryModel implements Serializable { } /** Convert {@code itr} to strings capturing values only. */ - protected static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) { + static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) { Set<String> strings = new HashSet<>(); while (itr.hasNext()) { strings.add(itr.next().getValue().toString()); @@ -90,7 +90,7 @@ public abstract class NexmarkQueryModel implements Serializable { public abstract AbstractSimulator<?, ?> simulator(); /** Return sub-sequence of results which are significant for model. */ - protected Iterable<TimestampedValue<KnownSize>> relevantResults( + Iterable<TimestampedValue<KnownSize>> relevantResults( Iterable<TimestampedValue<KnownSize>> results) { return results; } @@ -104,8 +104,6 @@ public abstract class NexmarkQueryModel implements Serializable { /** Return assertion to use on results of pipeline for this query. */ public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() { final Collection<String> expectedStrings = toCollection(simulator().results()); - final String[] expectedStringsArray = - expectedStrings.toArray(new String[expectedStrings.size()]); return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() { @Override @@ -113,9 +111,6 @@ public abstract class NexmarkQueryModel implements Serializable { Collection<String> actualStrings = toCollection(relevantResults(actual).iterator()); Assert.assertThat("wrong pipeline output", actualStrings, IsEqual.equalTo(expectedStrings)); -//compare without order -// Assert.assertThat("wrong pipeline output", actualStrings, -// IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray)); return null; } }; http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java index 6fb6613..8e65591 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java @@ -32,7 +32,7 @@ public class Query0Model extends NexmarkQueryModel { /** * Simulator for query 0. */ - private class Simulator extends AbstractSimulator<Event, Event> { + private static class Simulator extends AbstractSimulator<Event, Event> { public Simulator(NexmarkConfiguration configuration) { super(NexmarkUtils.standardEventIterator(configuration)); } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java index c919691..516dab1 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java @@ -35,7 +35,7 @@ import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; @@ -101,7 +101,7 @@ public class Query10 extends NexmarkQuery { @Override public String toString() { - return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename); + return String.format("%s %s %d %s %s%n", maxTimestamp, shard, index, timing, filename); } } @@ -130,8 +130,6 @@ public class Query10 extends NexmarkQuery { /** * Return channel for writing bytes to GCS. - * - * @throws IOException */ private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename) throws IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java index fd936a9..6db9bcf 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java @@ -63,14 +63,13 @@ public class Query11 extends NexmarkQuery { Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))) .discardingFiredPanes() .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2))); - PCollection<BidsPerSession> bidsPerSession = biddersWindowed.apply(Count.<Long>perElement()) + return biddersWindowed.apply(Count.<Long>perElement()) .apply(name + ".ToResult", ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() { @ProcessElement public void processElement(ProcessContext c) { c.output(new BidsPerSession(c.element().getKey(), c.element().getValue())); } })); - return bidsPerSession; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java index 0388687..5d4de45 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java @@ -34,7 +34,7 @@ public class Query1Model extends NexmarkQueryModel implements Serializable { /** * Simulator for query 1. */ - private class Simulator extends AbstractSimulator<Event, Bid> { + private static class Simulator extends AbstractSimulator<Event, Bid> { public Simulator(NexmarkConfiguration configuration) { super(NexmarkUtils.standardEventIterator(configuration)); } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java index 71364ba..f74b78d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java @@ -17,7 +17,6 @@ */ package org.apache.beam.integration.nexmark.queries; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.beam.integration.nexmark.NexmarkConfiguration; @@ -30,6 +29,13 @@ import org.apache.beam.integration.nexmark.model.Person; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.ParDo; @@ -41,13 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.TimerSpecs; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; @@ -176,18 +175,18 @@ public class Query3 extends NexmarkQuery { */ private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> { - private int maxAuctionsWaitingTime; + private final int maxAuctionsWaitingTime; private static final String AUCTIONS = "auctions"; private static final String PERSON = "person"; @StateId(PERSON) - private static final StateSpec<Object, ValueState<Person>> personSpec = + private static final StateSpec<ValueState<Person>> personSpec = StateSpecs.value(Person.CODER); private static final String PERSON_STATE_EXPIRING = "personStateExpiring"; @StateId(AUCTIONS) - private final StateSpec<Object, ValueState<List<Auction>>> auctionsSpec = + private final StateSpec<ValueState<List<Auction>>> auctionsSpec = StateSpecs.value(ListCoder.of(Auction.CODER)); @TimerId(PERSON_STATE_EXPIRING) @@ -219,8 +218,7 @@ public class Query3 extends NexmarkQuery { ProcessContext c, @TimerId(PERSON_STATE_EXPIRING) Timer timer, @StateId(PERSON) ValueState<Person> personState, - @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) - throws IOException { + @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) { // We would *almost* implement this by rewindowing into the global window and // running a combiner over the result. The combiner's accumulator would be the // state we use below. However, combiners cannot emit intermediate results, thus http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java index 6b98e2a..f415709 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java @@ -42,7 +42,7 @@ public class Query3Model extends NexmarkQueryModel implements Serializable { /** * Simulator for query 3. */ - private class Simulator extends AbstractSimulator<Event, NameCityStateId> { + private static class Simulator extends AbstractSimulator<Event, NameCityStateId> { /** Auctions, indexed by seller id. */ private final Multimap<Long, Auction> newAuctions; http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java index 634a58e..269e47a 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java @@ -93,8 +93,9 @@ public class Query4Model extends NexmarkQueryModel implements Serializable { } totals.put(category, total); } - for (long category : counts.keySet()) { - long count = counts.get(category); + for (Map.Entry<Long, Long> entry : counts.entrySet()) { + long category = entry.getKey(); + long count = entry.getValue(); long total = totals.get(category); TimestampedValue<CategoryPrice> result = TimestampedValue.of( new CategoryPrice(category, Math.round((double) total / count), true), lastTimestamp); http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java index 18ce578..1944330 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java @@ -18,7 +18,7 @@ package org.apache.beam.integration.nexmark.queries; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.beam.integration.nexmark.NexmarkConfiguration; import org.apache.beam.integration.nexmark.NexmarkUtils; @@ -80,7 +80,7 @@ public class Query5 extends NexmarkQuery { ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() { @ProcessElement public void processElement(ProcessContext c) { - c.output(KV.of(Arrays.asList(c.element().getKey()), c.element().getValue())); + c.output(KV.of(Collections.singletonList(c.element().getKey()), c.element().getValue())); } })) http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java index 65789ab..ea39ede 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java @@ -86,9 +86,7 @@ public class Query6 extends NexmarkQuery { public List<Bid> mergeAccumulators(Iterable<List<Bid>> accumulators) { List<Bid> result = new ArrayList<>(); for (List<Bid> accumulator : accumulators) { - for (Bid bid : accumulator) { - result.add(bid); - } + result.addAll(accumulator); } Collections.sort(result, Bid.ASCENDING_TIME_THEN_PRICE); if (result.size() > maxNumBids) { http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java index 0691714..9cb8b3d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java @@ -86,8 +86,9 @@ public class Query6Model extends NexmarkQueryModel implements Serializable { protected void run() { TimestampedValue<AuctionBid> timestampedWinningBid = nextInput(); if (timestampedWinningBid == null) { - for (long seller : numWinningBidsPerSeller.keySet()) { - long count = numWinningBidsPerSeller.get(seller); + for (Map.Entry<Long, Long> entry : numWinningBidsPerSeller.entrySet()) { + long seller = entry.getKey(); + long count = entry.getValue(); long total = totalWinningBidPricesPerSeller.get(seller); addResult(TimestampedValue.of( new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp)); http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java index 11a4d38..52891a7 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java @@ -25,8 +25,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -77,7 +77,7 @@ import org.joda.time.Instant; */ public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> { /** Windows for open auctions and bids. */ - private static class AuctionOrBidWindow extends IntervalWindow implements Serializable { + private static class AuctionOrBidWindow extends IntervalWindow { /** Id of auction this window is for. */ public final long auction; @@ -104,9 +104,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct /** Return an auction window for {@code auction}. */ public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) { - AuctionOrBidWindow result = - new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true); - return result; + return new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true); } /** @@ -127,9 +125,8 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct // Instead, we will just give the bid a finite window which expires at // the upper bound of auctions assuming the auction starts at the same time as the bid, // and assuming the system is running at its lowest event rate (as per interEventDelayUs). - AuctionOrBidWindow result = new AuctionOrBidWindow( + return new AuctionOrBidWindow( timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false); - return result; } /** Is this an auction window? */ @@ -171,8 +168,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct throws IOException, CoderException { IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED); long auction = ID_CODER.decode(inStream, Coder.Context.NESTED); - boolean isAuctionWindow = - INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true; + boolean isAuctionWindow = INT_CODER.decode(inStream, Context.NESTED) != 0; return new AuctionOrBidWindow( superWindow.start(), superWindow.end(), auction, isAuctionWindow); } @@ -194,15 +190,16 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct Event event = c.element(); if (event.newAuction != null) { // Assign auctions to an auction window which expires at the auction's close. - return Arrays.asList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction)); + return Collections + .singletonList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction)); } else if (event.bid != null) { // Assign bids to a temporary bid window which will later be merged into the appropriate // auction window. - return Arrays.asList( + return Collections.singletonList( AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid)); } else { // Don't assign people to any window. They will thus be dropped. - return Arrays.asList(); + return Collections.emptyList(); } } @@ -226,8 +223,9 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct // Merge all 'bid' windows into their corresponding 'auction' window, provided the // auction has not expired. - for (long auction : idToTrueAuctionWindow.keySet()) { - AuctionOrBidWindow auctionWindow = idToTrueAuctionWindow.get(auction); + for (Map.Entry<Long, AuctionOrBidWindow> entry : idToTrueAuctionWindow.entrySet()) { + long auction = entry.getKey(); + AuctionOrBidWindow auctionWindow = entry.getValue(); List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction); if (bidWindows != null) { List<AuctionOrBidWindow> toBeMerged = new ArrayList<>(); @@ -296,8 +294,8 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct configuration.firstEventRate, configuration.nextEventRate, configuration.rateUnit, configuration.numEventGenerators); long longestDelayUs = 0; - for (int i = 0; i < interEventDelayUs.length; i++) { - longestDelayUs = Math.max(longestDelayUs, interEventDelayUs[i]); + for (long interEventDelayU : interEventDelayUs) { + longestDelayUs = Math.max(longestDelayUs, interEventDelayU); } // Adjust for proportion of auction events amongst all events. longestDelayUs = http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java index 012d4e6..2a2732b 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java @@ -123,8 +123,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl @Override public void verifyDeterministic() throws NonDeterministicException {} }; - private long numEvents; - private long wallclockBaseTime; + private final long numEvents; + private final long wallclockBaseTime; private Checkpoint(long numEvents, long wallclockBaseTime) { this.numEvents = numEvents; @@ -403,8 +403,8 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl if (n < Integer.MAX_VALUE) { return random.nextInt((int) n); } else { - // TODO: Very skewed distribution! Bad! - return Math.abs(random.nextLong()) % n; + // WARNING: Very skewed distribution! Bad! + return Math.abs(random.nextLong() % n); } } @@ -470,14 +470,13 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES); long initialBid = nextPrice(random); - long dateTime = timestamp; long expires = timestamp + nextAuctionLengthMs(random, timestamp); String name = nextString(random, 20); String desc = nextString(random, 100); long reserve = initialBid + nextPrice(random); int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8; String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize); - return new Auction(id, name, desc, initialBid, reserve, dateTime, expires, seller, category, + return new Auction(id, name, desc, initialBid, reserve, timestamp, expires, seller, category, extra); } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java index 3caaf51..5799bb2 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/GeneratorConfig.java @@ -42,7 +42,7 @@ public class GeneratorConfig implements Serializable { */ public static final int PERSON_PROPORTION = 1; public static final int AUCTION_PROPORTION = 3; - public static final int BID_PROPORTION = 46; + private static final int BID_PROPORTION = 46; public static final int PROPORTION_DENOMINATOR = PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION; @@ -55,12 +55,12 @@ public class GeneratorConfig implements Serializable { * Delay between events, in microseconds. If the array has more than one entry then * the rate is changed every {@link #stepLengthSec}, and wraps around. */ - public final long[] interEventDelayUs; + private final long[] interEventDelayUs; /** * Delay before changing the current inter-event delay. */ - public final long stepLengthSec; + private final long stepLengthSec; /** * Time for first event (ms since epoch). @@ -88,13 +88,13 @@ public class GeneratorConfig implements Serializable { * True period of epoch in milliseconds. Derived from above. * (Ie time to run through cycle for all interEventDelayUs entries). */ - public final long epochPeriodMs; + private final long epochPeriodMs; /** * Number of events per epoch. Derived from above. * (Ie number of events to run through cycle for all interEventDelayUs entries). */ - public final long eventsPerEpoch; + private final long eventsPerEpoch; public GeneratorConfig( NexmarkConfiguration configuration, long baseTime, long firstEventId, @@ -121,10 +121,10 @@ public class GeneratorConfig implements Serializable { long eventsPerEpoch = 0; long epochPeriodMs = 0; if (interEventDelayUs.length > 1) { - for (int i = 0; i < interEventDelayUs.length; i++) { - long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i]; + for (long interEventDelayU : interEventDelayUs) { + long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU; eventsPerEpoch += numEventsForThisCycle; - epochPeriodMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L; + epochPeriodMs += (numEventsForThisCycle * interEventDelayU) / 1000L; } } this.eventsPerEpoch = eventsPerEpoch; @@ -248,16 +248,16 @@ public class GeneratorConfig implements Serializable { long epoch = eventNumber / eventsPerEpoch; long n = eventNumber % eventsPerEpoch; long offsetInEpochMs = 0; - for (int i = 0; i < interEventDelayUs.length; i++) { - long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i]; + for (long interEventDelayU : interEventDelayUs) { + long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayU; if (n < numEventsForThisCycle) { - long offsetInCycleUs = n * interEventDelayUs[i]; + long offsetInCycleUs = n * interEventDelayU; long timestamp = baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L); - return KV.of(timestamp, interEventDelayUs[i]); + return KV.of(timestamp, interEventDelayU); } n -= numEventsForThisCycle; - offsetInEpochMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L; + offsetInEpochMs += (numEventsForThisCycle * interEventDelayU) / 1000L; } throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach } http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java index c3c6eb0..09d945d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSource.java @@ -116,7 +116,7 @@ public class UnboundedEventSource extends UnboundedSource<Event, Generator.Check private TimestampedValue<Event> currentEvent; /** Events which have been held back so as to force them to be late. */ - private Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>(); + private final Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>(); public EventReader(Generator generator) { this.generator = generator; http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java index 15e17a8..1d04e2a 100644 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java +++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/sources/UnboundedEventSourceTest.java @@ -53,8 +53,8 @@ public class UnboundedEventSourceTest { * confirming reading events match the model events. */ private static class EventIdChecker { - private Set<Long> seenPersonIds = new HashSet<>(); - private Set<Long> seenAuctionIds = new HashSet<>(); + private final Set<Long> seenPersonIds = new HashSet<>(); + private final Set<Long> seenAuctionIds = new HashSet<>(); public void add(Event event) { if (event.newAuction != null) { @@ -90,7 +90,6 @@ public class UnboundedEventSourceTest { EventIdChecker checker = new EventIdChecker(); PipelineOptions options = TestPipeline.testingPipelineOptions(); - Pipeline p = TestPipeline.create(options); UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false); UnboundedReader<Event> reader = source.createReader(options, null); http://git-wip-us.apache.org/repos/asf/beam/blob/1541fad0/integration/pom.xml ---------------------------------------------------------------------- diff --git a/integration/pom.xml b/integration/pom.xml index 4839da5..31f293e 100644 --- a/integration/pom.xml +++ b/integration/pom.xml @@ -30,6 +30,20 @@ <packaging>pom</packaging> <name>Apache Beam :: Integration Tests</name> + <profiles> + <profile> + <id>release</id> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + </profile> + </profiles> + <modules> <module>java</module> </modules>