http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java new file mode 100644 index 0000000..550fbd2 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -0,0 +1,1157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.nexmark.model.Auction; +import org.apache.beam.sdk.nexmark.model.Bid; +import org.apache.beam.sdk.nexmark.model.Event; +import org.apache.beam.sdk.nexmark.model.KnownSize; +import org.apache.beam.sdk.nexmark.model.Person; +import org.apache.beam.sdk.nexmark.queries.NexmarkQuery; +import org.apache.beam.sdk.nexmark.queries.NexmarkQueryModel; +import org.apache.beam.sdk.nexmark.queries.Query0; +import org.apache.beam.sdk.nexmark.queries.Query0Model; +import org.apache.beam.sdk.nexmark.queries.Query1; +import org.apache.beam.sdk.nexmark.queries.Query10; +import org.apache.beam.sdk.nexmark.queries.Query11; +import org.apache.beam.sdk.nexmark.queries.Query12; +import org.apache.beam.sdk.nexmark.queries.Query1Model; +import org.apache.beam.sdk.nexmark.queries.Query2; +import org.apache.beam.sdk.nexmark.queries.Query2Model; +import org.apache.beam.sdk.nexmark.queries.Query3; +import org.apache.beam.sdk.nexmark.queries.Query3Model; +import org.apache.beam.sdk.nexmark.queries.Query4; +import org.apache.beam.sdk.nexmark.queries.Query4Model; +import org.apache.beam.sdk.nexmark.queries.Query5; +import org.apache.beam.sdk.nexmark.queries.Query5Model; +import org.apache.beam.sdk.nexmark.queries.Query6; +import org.apache.beam.sdk.nexmark.queries.Query6Model; +import org.apache.beam.sdk.nexmark.queries.Query7; +import org.apache.beam.sdk.nexmark.queries.Query7Model; +import org.apache.beam.sdk.nexmark.queries.Query8; +import org.apache.beam.sdk.nexmark.queries.Query8Model; +import org.apache.beam.sdk.nexmark.queries.Query9; +import org.apache.beam.sdk.nexmark.queries.Query9Model; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.joda.time.Duration; +import org.slf4j.LoggerFactory; + +/** + * Run a single Nexmark query using a given configuration. + */ +public class NexmarkLauncher<OptionT extends NexmarkOptions> { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NexmarkLauncher.class); + /** + * Minimum number of samples needed for 'stead-state' rate calculation. + */ + private static final int MIN_SAMPLES = 9; + /** + * Minimum length of time over which to consider samples for 'steady-state' rate calculation. + */ + private static final Duration MIN_WINDOW = Duration.standardMinutes(2); + /** + * Delay between perf samples. + */ + private static final Duration PERF_DELAY = Duration.standardSeconds(15); + /** + * How long to let streaming pipeline run after all events have been generated and we've + * seen no activity. + */ + private static final Duration DONE_DELAY = Duration.standardMinutes(1); + /** + * How long to allow no activity without warning. + */ + private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes(10); + /** + * How long to let streaming pipeline run after we've + * seen no activity, even if all events have not been generated. + */ + private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3); + /** + * NexmarkOptions shared by all runs. + */ + private final OptionT options; + + /** + * Which configuration we are running. + */ + @Nullable + private NexmarkConfiguration configuration; + + /** + * If in --pubsubMode=COMBINED, the event monitor for the publisher pipeline. Otherwise null. + */ + @Nullable + private Monitor<Event> publisherMonitor; + + /** + * If in --pubsubMode=COMBINED, the pipeline result for the publisher pipeline. Otherwise null. + */ + @Nullable + private PipelineResult publisherResult; + + /** + * Result for the main pipeline. + */ + @Nullable + private PipelineResult mainResult; + + /** + * Query name we are running. + */ + @Nullable + private String queryName; + + public NexmarkLauncher(OptionT options) { + this.options = options; + } + + + /** + * Is this query running in streaming mode? + */ + private boolean isStreaming() { + return options.isStreaming(); + } + + /** + * Return maximum number of workers. + */ + private int maxNumWorkers() { + return 5; + } + + /** + * Return the current value for a long counter, or a default value if can't be retrieved. + * Note this uses only attempted metrics because some runners don't support committed metrics. + */ + private long getCounterMetric(PipelineResult result, String namespace, String name, + long defaultValue) { + MetricQueryResults metrics = result.metrics().queryMetrics( + MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); + Iterable<MetricResult<Long>> counters = metrics.counters(); + try { + MetricResult<Long> metricResult = counters.iterator().next(); + return metricResult.attempted(); + } catch (NoSuchElementException e) { + LOG.error("Failed to get metric {}, from namespace {}", name, namespace); + } + return defaultValue; + } + + /** + * Return the current value for a long counter, or a default value if can't be retrieved. + * Note this uses only attempted metrics because some runners don't support committed metrics. + */ + private long getDistributionMetric(PipelineResult result, String namespace, String name, + DistributionType distType, long defaultValue) { + MetricQueryResults metrics = result.metrics().queryMetrics( + MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); + Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions(); + try { + MetricResult<DistributionResult> distributionResult = distributions.iterator().next(); + switch (distType) { + case MIN: + return distributionResult.attempted().min(); + case MAX: + return distributionResult.attempted().max(); + default: + return defaultValue; + } + } catch (NoSuchElementException e) { + LOG.error( + "Failed to get distribution metric {} for namespace {}", + name, + namespace); + } + return defaultValue; + } + + private enum DistributionType {MIN, MAX} + + /** + * Return the current value for a time counter, or -1 if can't be retrieved. + */ + private long getTimestampMetric(long now, long value) { + // timestamp metrics are used to monitor time of execution of transforms. + // If result timestamp metric is too far from now, consider that metric is erroneous + + if (Math.abs(value - now) > Duration.standardDays(10000).getMillis()) { + return -1; + } + return value; + } + + /** + * Find a 'steady state' events/sec from {@code snapshots} and + * store it in {@code perf} if found. + */ + private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) { + if (!options.isStreaming()) { + return; + } + + // Find the first sample with actual event and result counts. + int dataStart = 0; + for (; dataStart < snapshots.size(); dataStart++) { + if (snapshots.get(dataStart).numEvents >= 0 && snapshots.get(dataStart).numResults >= 0) { + break; + } + } + + // Find the last sample which demonstrated progress. + int dataEnd = snapshots.size() - 1; + for (; dataEnd > dataStart; dataEnd--) { + if (snapshots.get(dataEnd).anyActivity(snapshots.get(dataEnd - 1))) { + break; + } + } + + int numSamples = dataEnd - dataStart + 1; + if (numSamples < MIN_SAMPLES) { + // Not enough samples. + NexmarkUtils.console("%d samples not enough to calculate steady-state event rate", + numSamples); + return; + } + + // We'll look at only the middle third samples. + int sampleStart = dataStart + numSamples / 3; + int sampleEnd = dataEnd - numSamples / 3; + + double sampleSec = + snapshots.get(sampleEnd).secSinceStart - snapshots.get(sampleStart).secSinceStart; + if (sampleSec < MIN_WINDOW.getStandardSeconds()) { + // Not sampled over enough time. + NexmarkUtils.console( + "sample of %.1f sec not long enough to calculate steady-state event rate", + sampleSec); + return; + } + + // Find rate with least squares error. + double sumxx = 0.0; + double sumxy = 0.0; + long prevNumEvents = -1; + for (int i = sampleStart; i <= sampleEnd; i++) { + if (prevNumEvents == snapshots.get(i).numEvents) { + // Skip samples with no change in number of events since they contribute no data. + continue; + } + // Use the effective runtime instead of wallclock time so we can + // insulate ourselves from delays and stutters in the query manager. + double x = snapshots.get(i).runtimeSec; + prevNumEvents = snapshots.get(i).numEvents; + double y = prevNumEvents; + sumxx += x * x; + sumxy += x * y; + } + double eventsPerSec = sumxy / sumxx; + NexmarkUtils.console("revising events/sec from %.1f to %.1f", perf.eventsPerSec, eventsPerSec); + perf.eventsPerSec = eventsPerSec; + } + + /** + * Return the current performance given {@code eventMonitor} and {@code resultMonitor}. + */ + private NexmarkPerf currentPerf( + long startMsSinceEpoch, long now, PipelineResult result, + List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor, + Monitor<?> resultMonitor) { + NexmarkPerf perf = new NexmarkPerf(); + + long numEvents = + getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".elements", -1); + long numEventBytes = + getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".bytes", -1); + long eventStart = + getTimestampMetric(now, + getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".startTime", + DistributionType.MIN, -1)); + long eventEnd = + getTimestampMetric(now, + getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".endTime", + DistributionType.MAX, -1)); + + long numResults = + getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".elements", -1); + long numResultBytes = + getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".bytes", -1); + long resultStart = + getTimestampMetric(now, + getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTime", + DistributionType.MIN, -1)); + long resultEnd = + getTimestampMetric(now, + getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTime", + DistributionType.MAX, -1)); + long timestampStart = + getTimestampMetric(now, + getDistributionMetric(result, + resultMonitor.name, resultMonitor.prefix + ".startTimestamp", + DistributionType.MIN, -1)); + long timestampEnd = + getTimestampMetric(now, + getDistributionMetric(result, + resultMonitor.name, resultMonitor.prefix + ".endTimestamp", + DistributionType.MAX, -1)); + + long effectiveEnd = -1; + if (eventEnd >= 0 && resultEnd >= 0) { + // It is possible for events to be generated after the last result was emitted. + // (Eg Query 2, which only yields results for a small prefix of the event stream.) + // So use the max of last event and last result times. + effectiveEnd = Math.max(eventEnd, resultEnd); + } else if (resultEnd >= 0) { + effectiveEnd = resultEnd; + } else if (eventEnd >= 0) { + // During startup we may have no result yet, but we would still like to track how + // long the pipeline has been running. + effectiveEnd = eventEnd; + } + + if (effectiveEnd >= 0 && eventStart >= 0 && effectiveEnd >= eventStart) { + perf.runtimeSec = (effectiveEnd - eventStart) / 1000.0; + } + + if (numEvents >= 0) { + perf.numEvents = numEvents; + } + + if (numEvents >= 0 && perf.runtimeSec > 0.0) { + // For streaming we may later replace this with a 'steady-state' value calculated + // from the progress snapshots. + perf.eventsPerSec = numEvents / perf.runtimeSec; + } + + if (numEventBytes >= 0 && perf.runtimeSec > 0.0) { + perf.eventBytesPerSec = numEventBytes / perf.runtimeSec; + } + + if (numResults >= 0) { + perf.numResults = numResults; + } + + if (numResults >= 0 && perf.runtimeSec > 0.0) { + perf.resultsPerSec = numResults / perf.runtimeSec; + } + + if (numResultBytes >= 0 && perf.runtimeSec > 0.0) { + perf.resultBytesPerSec = numResultBytes / perf.runtimeSec; + } + + if (eventStart >= 0) { + perf.startupDelaySec = (eventStart - startMsSinceEpoch) / 1000.0; + } + + if (resultStart >= 0 && eventStart >= 0 && resultStart >= eventStart) { + perf.processingDelaySec = (resultStart - eventStart) / 1000.0; + } + + if (timestampStart >= 0 && timestampEnd >= 0 && perf.runtimeSec > 0.0) { + double eventRuntimeSec = (timestampEnd - timestampStart) / 1000.0; + perf.timeDilation = eventRuntimeSec / perf.runtimeSec; + } + + if (resultEnd >= 0) { + // Fill in the shutdown delay assuming the job has now finished. + perf.shutdownDelaySec = (now - resultEnd) / 1000.0; + } + + // As soon as available, try to capture cumulative cost at this point too. + + NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot(); + snapshot.secSinceStart = (now - startMsSinceEpoch) / 1000.0; + snapshot.runtimeSec = perf.runtimeSec; + snapshot.numEvents = numEvents; + snapshot.numResults = numResults; + snapshots.add(snapshot); + + captureSteadyState(perf, snapshots); + + return perf; + } + + /** + * Build and run a pipeline using specified options. + */ + interface PipelineBuilder<OptionT extends NexmarkOptions> { + void build(OptionT publishOnlyOptions); + } + + /** + * Invoke the builder with options suitable for running a publish-only child pipeline. + */ + private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) { + builder.build(options); + } + + /** + * Monitor the performance and progress of a running job. Return final performance if + * it was measured. + */ + @Nullable + private NexmarkPerf monitor(NexmarkQuery query) { + if (!options.getMonitorJobs()) { + return null; + } + + if (configuration.debug) { + NexmarkUtils.console("Waiting for main pipeline to 'finish'"); + } else { + NexmarkUtils.console("--debug=false, so job will not self-cancel"); + } + + PipelineResult job = mainResult; + PipelineResult publisherJob = publisherResult; + List<NexmarkPerf.ProgressSnapshot> snapshots = new ArrayList<>(); + long startMsSinceEpoch = System.currentTimeMillis(); + long endMsSinceEpoch = -1; + if (options.getRunningTimeMinutes() != null) { + endMsSinceEpoch = startMsSinceEpoch + + Duration.standardMinutes(options.getRunningTimeMinutes()).getMillis() + - Duration.standardSeconds(configuration.preloadSeconds).getMillis(); + } + long lastActivityMsSinceEpoch = -1; + NexmarkPerf perf = null; + boolean waitingForShutdown = false; + boolean publisherCancelled = false; + List<String> errors = new ArrayList<>(); + + while (true) { + long now = System.currentTimeMillis(); + if (endMsSinceEpoch >= 0 && now > endMsSinceEpoch && !waitingForShutdown) { + NexmarkUtils.console("Reached end of test, cancelling job"); + try { + job.cancel(); + } catch (IOException e) { + throw new RuntimeException("Unable to cancel main job: ", e); + } + if (publisherResult != null) { + try { + publisherJob.cancel(); + } catch (IOException e) { + throw new RuntimeException("Unable to cancel publisher job: ", e); + } + publisherCancelled = true; + } + waitingForShutdown = true; + } + + PipelineResult.State state = job.getState(); + NexmarkUtils.console("%s %s%s", state, queryName, + waitingForShutdown ? " (waiting for shutdown)" : ""); + + NexmarkPerf currPerf; + if (configuration.debug) { + currPerf = currentPerf(startMsSinceEpoch, now, job, snapshots, + query.eventMonitor, query.resultMonitor); + } else { + currPerf = null; + } + + if (perf == null || perf.anyActivity(currPerf)) { + lastActivityMsSinceEpoch = now; + } + + if (options.isStreaming() && !waitingForShutdown) { + Duration quietFor = new Duration(lastActivityMsSinceEpoch, now); + long fatalCount = getCounterMetric(job, query.getName(), "fatal", 0); + if (fatalCount > 0) { + NexmarkUtils.console("job has fatal errors, cancelling."); + errors.add(String.format("Pipeline reported %s fatal errors", fatalCount)); + waitingForShutdown = true; + } else if (configuration.debug && configuration.numEvents > 0 + && currPerf.numEvents == configuration.numEvents + && currPerf.numResults >= 0 && quietFor.isLongerThan(DONE_DELAY)) { + NexmarkUtils.console("streaming query appears to have finished, cancelling job."); + waitingForShutdown = true; + } else if (quietFor.isLongerThan(STUCK_TERMINATE_DELAY)) { + NexmarkUtils.console("streaming query appears to have gotten stuck, cancelling job."); + errors.add("Streaming job was cancelled since appeared stuck"); + waitingForShutdown = true; + } else if (quietFor.isLongerThan(STUCK_WARNING_DELAY)) { + NexmarkUtils.console("WARNING: streaming query appears to have been stuck for %d min.", + quietFor.getStandardMinutes()); + errors.add( + String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes())); + } + + if (waitingForShutdown) { + try { + job.cancel(); + } catch (IOException e) { + throw new RuntimeException("Unable to cancel main job: ", e); + } + } + } + + perf = currPerf; + + boolean running = true; + switch (state) { + case UNKNOWN: + case STOPPED: + case RUNNING: + // Keep going. + break; + case DONE: + // All done. + running = false; + break; + case CANCELLED: + running = false; + if (!waitingForShutdown) { + errors.add("Job was unexpectedly cancelled"); + } + break; + case FAILED: + case UPDATED: + // Abnormal termination. + running = false; + errors.add("Job was unexpectedly updated"); + break; + } + + if (!running) { + break; + } + + if (lastActivityMsSinceEpoch == now) { + NexmarkUtils.console("new perf %s", perf); + } else { + NexmarkUtils.console("no activity"); + } + + try { + Thread.sleep(PERF_DELAY.getMillis()); + } catch (InterruptedException e) { + Thread.interrupted(); + NexmarkUtils.console("Interrupted: pipeline is still running"); + } + } + + perf.errors = errors; + perf.snapshots = snapshots; + + if (publisherResult != null) { + NexmarkUtils.console("Shutting down publisher pipeline."); + try { + if (!publisherCancelled) { + publisherJob.cancel(); + } + publisherJob.waitUntilFinish(Duration.standardMinutes(5)); + } catch (IOException e) { + throw new RuntimeException("Unable to cancel publisher job: ", e); + } + } + + return perf; + } + + // ================================================================================ + // Basic sources and sinks + // ================================================================================ + + /** + * Return a topic name. + */ + private String shortTopic(long now) { + String baseTopic = options.getPubsubTopic(); + if (Strings.isNullOrEmpty(baseTopic)) { + throw new RuntimeException("Missing --pubsubTopic"); + } + switch (options.getResourceNameMode()) { + case VERBATIM: + return baseTopic; + case QUERY: + return String.format("%s_%s_source", baseTopic, queryName); + case QUERY_AND_SALT: + return String.format("%s_%s_%d_source", baseTopic, queryName, now); + } + throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); + } + + /** + * Return a subscription name. + */ + private String shortSubscription(long now) { + String baseSubscription = options.getPubsubSubscription(); + if (Strings.isNullOrEmpty(baseSubscription)) { + throw new RuntimeException("Missing --pubsubSubscription"); + } + switch (options.getResourceNameMode()) { + case VERBATIM: + return baseSubscription; + case QUERY: + return String.format("%s_%s_source", baseSubscription, queryName); + case QUERY_AND_SALT: + return String.format("%s_%s_%d_source", baseSubscription, queryName, now); + } + throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); + } + + /** + * Return a file name for plain text. + */ + private String textFilename(long now) { + String baseFilename = options.getOutputPath(); + if (Strings.isNullOrEmpty(baseFilename)) { + throw new RuntimeException("Missing --outputPath"); + } + switch (options.getResourceNameMode()) { + case VERBATIM: + return baseFilename; + case QUERY: + return String.format("%s/nexmark_%s.txt", baseFilename, queryName); + case QUERY_AND_SALT: + return String.format("%s/nexmark_%s_%d.txt", baseFilename, queryName, now); + } + throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); + } + + /** + * Return a BigQuery table spec. + */ + private String tableSpec(long now, String version) { + String baseTableName = options.getBigQueryTable(); + if (Strings.isNullOrEmpty(baseTableName)) { + throw new RuntimeException("Missing --bigQueryTable"); + } + switch (options.getResourceNameMode()) { + case VERBATIM: + return String.format("%s:nexmark.%s_%s", + options.getProject(), baseTableName, version); + case QUERY: + return String.format("%s:nexmark.%s_%s_%s", + options.getProject(), baseTableName, queryName, version); + case QUERY_AND_SALT: + return String.format("%s:nexmark.%s_%s_%s_%d", + options.getProject(), baseTableName, queryName, version, now); + } + throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); + } + + /** + * Return a directory for logs. + */ + private String logsDir(long now) { + String baseFilename = options.getOutputPath(); + if (Strings.isNullOrEmpty(baseFilename)) { + throw new RuntimeException("Missing --outputPath"); + } + switch (options.getResourceNameMode()) { + case VERBATIM: + return baseFilename; + case QUERY: + return String.format("%s/logs_%s", baseFilename, queryName); + case QUERY_AND_SALT: + return String.format("%s/logs_%s_%d", baseFilename, queryName, now); + } + throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode()); + } + + /** + * Return a source of synthetic events. + */ + private PCollection<Event> sourceEventsFromSynthetic(Pipeline p) { + if (isStreaming()) { + NexmarkUtils.console("Generating %d events in streaming mode", configuration.numEvents); + return p.apply(queryName + ".ReadUnbounded", NexmarkUtils.streamEventsSource(configuration)); + } else { + NexmarkUtils.console("Generating %d events in batch mode", configuration.numEvents); + return p.apply(queryName + ".ReadBounded", NexmarkUtils.batchEventsSource(configuration)); + } + } + + /** + * Return source of events from Pubsub. + */ + private PCollection<Event> sourceEventsFromPubsub(Pipeline p, long now) { + String shortSubscription = shortSubscription(now); + NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription); + + PubsubIO.Read<PubsubMessage> io = + PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription) + .withIdAttribute(NexmarkUtils.PUBSUB_ID); + if (!configuration.usePubsubPublishTime) { + io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); + } + + return p + .apply(queryName + ".ReadPubsubEvents", io) + .apply(queryName + ".PubsubMessageToEvent", ParDo.of(new DoFn<PubsubMessage, Event>() { + @ProcessElement + public void processElement(ProcessContext c) { + byte[] payload = c.element().getPayload(); + try { + Event event = CoderUtils.decodeFromByteArray(Event.CODER, payload); + c.output(event); + } catch (CoderException e) { + LOG.error("Error while decoding Event from pusbSub message: serialization error"); + } + } + })); + } + + /** + * Return Avro source of events from {@code options.getInputFilePrefix}. + */ + private PCollection<Event> sourceEventsFromAvro(Pipeline p) { + String filename = options.getInputPath(); + if (Strings.isNullOrEmpty(filename)) { + throw new RuntimeException("Missing --inputPath"); + } + NexmarkUtils.console("Reading events from Avro files at %s", filename); + return p + .apply(queryName + ".ReadAvroEvents", AvroIO.read(Event.class) + .from(filename + "*.avro")) + .apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA); + } + + /** + * Send {@code events} to Pubsub. + */ + private void sinkEventsToPubsub(PCollection<Event> events, long now) { + String shortTopic = shortTopic(now); + NexmarkUtils.console("Writing events to Pubsub %s", shortTopic); + + PubsubIO.Write<PubsubMessage> io = + PubsubIO.writeMessages().to(shortTopic) + .withIdAttribute(NexmarkUtils.PUBSUB_ID); + if (!configuration.usePubsubPublishTime) { + io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); + } + + events.apply(queryName + ".EventToPubsubMessage", + ParDo.of(new DoFn<Event, PubsubMessage>() { + @ProcessElement + public void processElement(ProcessContext c) { + try { + byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, c.element()); + c.output(new PubsubMessage(payload, new HashMap<String, String>())); + } catch (CoderException e1) { + LOG.error("Error while sending Event {} to pusbSub: serialization error", + c.element().toString()); + } + } + }) + ) + .apply(queryName + ".WritePubsubEvents", io); + } + + /** + * Send {@code formattedResults} to Pubsub. + */ + private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) { + String shortTopic = shortTopic(now); + NexmarkUtils.console("Writing results to Pubsub %s", shortTopic); + PubsubIO.Write<String> io = + PubsubIO.writeStrings().to(shortTopic) + .withIdAttribute(NexmarkUtils.PUBSUB_ID); + if (!configuration.usePubsubPublishTime) { + io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); + } + formattedResults.apply(queryName + ".WritePubsubResults", io); + } + + /** + * Sink all raw Events in {@code source} to {@code options.getOutputPath}. + * This will configure the job to write the following files: + * <ul> + * <li>{@code $outputPath/event*.avro} All Event entities. + * <li>{@code $outputPath/auction*.avro} Auction entities. + * <li>{@code $outputPath/bid*.avro} Bid entities. + * <li>{@code $outputPath/person*.avro} Person entities. + * </ul> + * + * @param source A PCollection of events. + */ + private void sinkEventsToAvro(PCollection<Event> source) { + String filename = options.getOutputPath(); + if (Strings.isNullOrEmpty(filename)) { + throw new RuntimeException("Missing --outputPath"); + } + NexmarkUtils.console("Writing events to Avro files at %s", filename); + source.apply(queryName + ".WriteAvroEvents", + AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro")); + source.apply(NexmarkQuery.JUST_BIDS) + .apply(queryName + ".WriteAvroBids", + AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro")); + source.apply(NexmarkQuery.JUST_NEW_AUCTIONS) + .apply(queryName + ".WriteAvroAuctions", + AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro")); + source.apply(NexmarkQuery.JUST_NEW_PERSONS) + .apply(queryName + ".WriteAvroPeople", + AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro")); + } + + /** + * Send {@code formattedResults} to text files. + */ + private void sinkResultsToText(PCollection<String> formattedResults, long now) { + String filename = textFilename(now); + NexmarkUtils.console("Writing results to text files at %s", filename); + formattedResults.apply(queryName + ".WriteTextResults", + TextIO.write().to(filename)); + } + + private static class StringToTableRow extends DoFn<String, TableRow> { + @ProcessElement + public void processElement(ProcessContext c) { + int n = ThreadLocalRandom.current().nextInt(10); + List<TableRow> records = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + records.add(new TableRow().set("index", i).set("value", Integer.toString(i))); + } + c.output(new TableRow().set("result", c.element()).set("records", records)); + } + } + + /** + * Send {@code formattedResults} to BigQuery. + */ + private void sinkResultsToBigQuery( + PCollection<String> formattedResults, long now, + String version) { + String tableSpec = tableSpec(now, version); + TableSchema tableSchema = + new TableSchema().setFields(ImmutableList.of( + new TableFieldSchema().setName("result").setType("STRING"), + new TableFieldSchema().setName("records").setMode("REPEATED").setType("RECORD") + .setFields(ImmutableList.of( + new TableFieldSchema().setName("index").setType("INTEGER"), + new TableFieldSchema().setName("value").setType("STRING"))))); + NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec); + BigQueryIO.Write io = + BigQueryIO.write().to(tableSpec) + .withSchema(tableSchema) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND); + formattedResults + .apply(queryName + ".StringToTableRow", ParDo.of(new StringToTableRow())) + .apply(queryName + ".WriteBigQueryResults", io); + } + + // ================================================================================ + // Construct overall pipeline + // ================================================================================ + + /** + * Return source of events for this run, or null if we are simply publishing events + * to Pubsub. + */ + private PCollection<Event> createSource(Pipeline p, final long now) { + PCollection<Event> source = null; + switch (configuration.sourceType) { + case DIRECT: + source = sourceEventsFromSynthetic(p); + break; + case AVRO: + source = sourceEventsFromAvro(p); + break; + case PUBSUB: + // Setup the sink for the publisher. + switch (configuration.pubSubMode) { + case SUBSCRIBE_ONLY: + // Nothing to publish. + break; + case PUBLISH_ONLY: + // Send synthesized events to Pubsub in this job. + sinkEventsToPubsub(sourceEventsFromSynthetic(p).apply(queryName + ".Snoop", + NexmarkUtils.snoop(queryName)), now); + break; + case COMBINED: + // Send synthesized events to Pubsub in separate publisher job. + // We won't start the main pipeline until the publisher has sent the pre-load events. + // We'll shutdown the publisher job when we notice the main job has finished. + invokeBuilderForPublishOnlyPipeline(new PipelineBuilder<NexmarkOptions>() { + @Override + public void build(NexmarkOptions publishOnlyOptions) { + Pipeline sp = Pipeline.create(options); + NexmarkUtils.setupPipeline(configuration.coderStrategy, sp); + publisherMonitor = new Monitor<>(queryName, "publisher"); + sinkEventsToPubsub( + sourceEventsFromSynthetic(sp) + .apply(queryName + ".Monitor", publisherMonitor.getTransform()), + now); + publisherResult = sp.run(); + } + }); + break; + } + + // Setup the source for the consumer. + switch (configuration.pubSubMode) { + case PUBLISH_ONLY: + // Nothing to consume. Leave source null. + break; + case SUBSCRIBE_ONLY: + case COMBINED: + // Read events from pubsub. + source = sourceEventsFromPubsub(p, now); + break; + } + break; + } + return source; + } + + private static final TupleTag<String> MAIN = new TupleTag<String>(){}; + private static final TupleTag<String> SIDE = new TupleTag<String>(){}; + + private static class PartitionDoFn extends DoFn<String, String> { + @ProcessElement + public void processElement(ProcessContext c) { + if (c.element().hashCode() % 2 == 0) { + c.output(c.element()); + } else { + c.output(SIDE, c.element()); + } + } + } + + /** + * Consume {@code results}. + */ + private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) { + if (configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) { + // Avoid the cost of formatting the results. + results.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName)); + return; + } + + PCollection<String> formattedResults = + results.apply(queryName + ".Format", NexmarkUtils.format(queryName)); + if (options.getLogResults()) { + formattedResults = formattedResults.apply(queryName + ".Results.Log", + NexmarkUtils.<String>log(queryName + ".Results")); + } + + switch (configuration.sinkType) { + case DEVNULL: + // Discard all results + formattedResults.apply(queryName + ".DevNull", NexmarkUtils.devNull(queryName)); + break; + case PUBSUB: + sinkResultsToPubsub(formattedResults, now); + break; + case TEXT: + sinkResultsToText(formattedResults, now); + break; + case AVRO: + NexmarkUtils.console( + "WARNING: with --sinkType=AVRO, actual query results will be discarded."); + break; + case BIGQUERY: + // Multiple BigQuery backends to mimic what most customers do. + PCollectionTuple res = formattedResults.apply(queryName + ".Partition", + ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE))); + sinkResultsToBigQuery(res.get(MAIN), now, "main"); + sinkResultsToBigQuery(res.get(SIDE), now, "side"); + sinkResultsToBigQuery(formattedResults, now, "copy"); + break; + case COUNT_ONLY: + // Short-circuited above. + throw new RuntimeException(); + } + } + + // ================================================================================ + // Entry point + // ================================================================================ + + /** + * Calculate the distribution of the expected rate of results per minute (in event time, not + * wallclock time). + */ + private void modelResultRates(NexmarkQueryModel model) { + List<Long> counts = Lists.newArrayList(model.simulator().resultsPerWindow()); + Collections.sort(counts); + int n = counts.size(); + if (n < 5) { + NexmarkUtils.console("Query%d: only %d samples", model.configuration.query, n); + } else { + NexmarkUtils.console("Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d", + model.configuration.query, n, counts.get(0), counts.get(n / 4), + counts.get(n / 2), + counts.get(n - 1 - n / 4), counts.get(n - 1)); + } + } + + /** + * Run {@code configuration} and return its performance if possible. + */ + @Nullable + public NexmarkPerf run(NexmarkConfiguration runConfiguration) { + if (options.getManageResources() && !options.getMonitorJobs()) { + throw new RuntimeException("If using --manageResources then must also use --monitorJobs."); + } + + // + // Setup per-run state. + // + checkState(configuration == null); + checkState(queryName == null); + configuration = runConfiguration; + + try { + NexmarkUtils.console("Running %s", configuration.toShortString()); + + if (configuration.numEvents < 0) { + NexmarkUtils.console("skipping since configuration is disabled"); + return null; + } + + List<NexmarkQuery> queries = Arrays.asList(new Query0(configuration), + new Query1(configuration), + new Query2(configuration), + new Query3(configuration), + new Query4(configuration), + new Query5(configuration), + new Query6(configuration), + new Query7(configuration), + new Query8(configuration), + new Query9(configuration), + new Query10(configuration), + new Query11(configuration), + new Query12(configuration)); + NexmarkQuery query = queries.get(configuration.query); + queryName = query.getName(); + + List<NexmarkQueryModel> models = Arrays.asList( + new Query0Model(configuration), + new Query1Model(configuration), + new Query2Model(configuration), + new Query3Model(configuration), + new Query4Model(configuration), + new Query5Model(configuration), + new Query6Model(configuration), + new Query7Model(configuration), + new Query8Model(configuration), + new Query9Model(configuration), + null, + null, + null); + NexmarkQueryModel model = models.get(configuration.query); + + if (options.getJustModelResultRate()) { + if (model == null) { + throw new RuntimeException(String.format("No model for %s", queryName)); + } + modelResultRates(model); + return null; + } + + long now = System.currentTimeMillis(); + Pipeline p = Pipeline.create(options); + NexmarkUtils.setupPipeline(configuration.coderStrategy, p); + + // Generate events. + PCollection<Event> source = createSource(p, now); + + if (options.getLogEvents()) { + source = source.apply(queryName + ".Events.Log", + NexmarkUtils.<Event>log(queryName + ".Events")); + } + + // Source will be null if source type is PUBSUB and mode is PUBLISH_ONLY. + // In that case there's nothing more to add to pipeline. + if (source != null) { + // Optionally sink events in Avro format. + // (Query results are ignored). + if (configuration.sinkType == NexmarkUtils.SinkType.AVRO) { + sinkEventsToAvro(source); + } + + // Query 10 logs all events to Google Cloud storage files. It could generate a lot of logs, + // so, set parallelism. Also set the output path where to write log files. + if (configuration.query == 10) { + String path = null; + if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) { + path = logsDir(now); + } + ((Query10) query).setOutputPath(path); + ((Query10) query).setMaxNumWorkers(maxNumWorkers()); + } + + // Apply query. + PCollection<TimestampedValue<KnownSize>> results = source.apply(query); + + if (options.getAssertCorrectness()) { + if (model == null) { + throw new RuntimeException(String.format("No model for %s", queryName)); + } + // We know all our streams have a finite number of elements. + results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); + // If we have a finite number of events then assert our pipeline's + // results match those of a model using the same sequence of events. + PAssert.that(results).satisfies(model.assertionFor()); + } + + // Output results. + sink(results, now); + } + + mainResult = p.run(); + mainResult.waitUntilFinish(Duration.standardSeconds(configuration.streamTimeout)); + return monitor(query); + } finally { + configuration = null; + queryName = null; + } + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java new file mode 100644 index 0000000..2a2a5a7 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.ApplicationNameOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; + +/** + * Command line flags. + */ +public interface NexmarkOptions + extends ApplicationNameOptions, GcpOptions, PipelineOptions, StreamingOptions { + @Description("Which suite to run. Default is to use command line arguments for one job.") + @Default.Enum("DEFAULT") + NexmarkSuite getSuite(); + + void setSuite(NexmarkSuite suite); + + @Description("If true, monitor the jobs as they run.") + @Default.Boolean(false) + boolean getMonitorJobs(); + + void setMonitorJobs(boolean monitorJobs); + + @Description("Where the events come from.") + @Nullable + NexmarkUtils.SourceType getSourceType(); + + void setSourceType(NexmarkUtils.SourceType sourceType); + + @Description("Prefix for input files if using avro input") + @Nullable + String getInputPath(); + + void setInputPath(String inputPath); + + @Description("Where results go.") + @Nullable + NexmarkUtils.SinkType getSinkType(); + + void setSinkType(NexmarkUtils.SinkType sinkType); + + @Description("Which mode to run in when source is PUBSUB.") + @Nullable + NexmarkUtils.PubSubMode getPubSubMode(); + + void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode); + + @Description("Which query to run.") + @Nullable + Integer getQuery(); + + void setQuery(Integer query); + + @Description("Prefix for output files if using text output for results or running Query 10.") + @Nullable + String getOutputPath(); + + void setOutputPath(String outputPath); + + @Description("Base name of pubsub topic to publish to in streaming mode.") + @Nullable + @Default.String("nexmark") + String getPubsubTopic(); + + void setPubsubTopic(String pubsubTopic); + + @Description("Base name of pubsub subscription to read from in streaming mode.") + @Nullable + @Default.String("nexmark") + String getPubsubSubscription(); + + void setPubsubSubscription(String pubsubSubscription); + + @Description("Base name of BigQuery table name if using BigQuery output.") + @Nullable + @Default.String("nexmark") + String getBigQueryTable(); + + void setBigQueryTable(String bigQueryTable); + + @Description("Approximate number of events to generate. " + + "Zero for effectively unlimited in streaming mode.") + @Nullable + Long getNumEvents(); + + void setNumEvents(Long numEvents); + + @Description("Time in seconds to preload the subscription with data, at the initial input rate " + + "of the pipeline.") + @Nullable + Integer getPreloadSeconds(); + + void setPreloadSeconds(Integer preloadSeconds); + + @Description( + "Time in seconds to wait in pipelineResult.waitUntilFinish(), useful in streaming mode") + @Nullable + Integer getStreamTimeout(); + + void setStreamTimeout(Integer streamTimeout); + + @Description("Number of unbounded sources to create events.") + @Nullable + Integer getNumEventGenerators(); + + void setNumEventGenerators(Integer numEventGenerators); + + @Description("Shape of event rate curve.") + @Nullable + NexmarkUtils.RateShape getRateShape(); + + void setRateShape(NexmarkUtils.RateShape rateShape); + + @Description("Initial overall event rate (in --rateUnit).") + @Nullable + Integer getFirstEventRate(); + + void setFirstEventRate(Integer firstEventRate); + + @Description("Next overall event rate (in --rateUnit).") + @Nullable + Integer getNextEventRate(); + + void setNextEventRate(Integer nextEventRate); + + @Description("Unit for rates.") + @Nullable + NexmarkUtils.RateUnit getRateUnit(); + + void setRateUnit(NexmarkUtils.RateUnit rateUnit); + + @Description("Overall period of rate shape, in seconds.") + @Nullable + Integer getRatePeriodSec(); + + void setRatePeriodSec(Integer ratePeriodSec); + + @Description("If true, relay events in real time in streaming mode.") + @Nullable + Boolean getIsRateLimited(); + + void setIsRateLimited(Boolean isRateLimited); + + @Description("If true, use wallclock time as event time. Otherwise, use a deterministic" + + " time in the past so that multiple runs will see exactly the same event streams" + + " and should thus have exactly the same results.") + @Nullable + Boolean getUseWallclockEventTime(); + + void setUseWallclockEventTime(Boolean useWallclockEventTime); + + @Description("Assert pipeline results match model results.") + @Nullable + boolean getAssertCorrectness(); + + void setAssertCorrectness(boolean assertCorrectness); + + @Description("Log all input events.") + @Nullable + boolean getLogEvents(); + + void setLogEvents(boolean logEvents); + + @Description("Log all query results.") + @Nullable + boolean getLogResults(); + + void setLogResults(boolean logResults); + + @Description("Average size in bytes for a person record.") + @Nullable + Integer getAvgPersonByteSize(); + + void setAvgPersonByteSize(Integer avgPersonByteSize); + + @Description("Average size in bytes for an auction record.") + @Nullable + Integer getAvgAuctionByteSize(); + + void setAvgAuctionByteSize(Integer avgAuctionByteSize); + + @Description("Average size in bytes for a bid record.") + @Nullable + Integer getAvgBidByteSize(); + + void setAvgBidByteSize(Integer avgBidByteSize); + + @Description("Ratio of bids for 'hot' auctions above the background.") + @Nullable + Integer getHotAuctionRatio(); + + void setHotAuctionRatio(Integer hotAuctionRatio); + + @Description("Ratio of auctions for 'hot' sellers above the background.") + @Nullable + Integer getHotSellersRatio(); + + void setHotSellersRatio(Integer hotSellersRatio); + + @Description("Ratio of auctions for 'hot' bidders above the background.") + @Nullable + Integer getHotBiddersRatio(); + + void setHotBiddersRatio(Integer hotBiddersRatio); + + @Description("Window size in seconds.") + @Nullable + Long getWindowSizeSec(); + + void setWindowSizeSec(Long windowSizeSec); + + @Description("Window period in seconds.") + @Nullable + Long getWindowPeriodSec(); + + void setWindowPeriodSec(Long windowPeriodSec); + + @Description("If in streaming mode, the holdback for watermark in seconds.") + @Nullable + Long getWatermarkHoldbackSec(); + + void setWatermarkHoldbackSec(Long watermarkHoldbackSec); + + @Description("Roughly how many auctions should be in flight for each generator.") + @Nullable + Integer getNumInFlightAuctions(); + + void setNumInFlightAuctions(Integer numInFlightAuctions); + + + @Description("Maximum number of people to consider as active for placing auctions or bids.") + @Nullable + Integer getNumActivePeople(); + + void setNumActivePeople(Integer numActivePeople); + + @Description("Filename of perf data to append to.") + @Nullable + String getPerfFilename(); + + void setPerfFilename(String perfFilename); + + @Description("Filename of baseline perf data to read from.") + @Nullable + String getBaselineFilename(); + + void setBaselineFilename(String baselineFilename); + + @Description("Filename of summary perf data to append to.") + @Nullable + String getSummaryFilename(); + + void setSummaryFilename(String summaryFilename); + + @Description("Filename for javascript capturing all perf data and any baselines.") + @Nullable + String getJavascriptFilename(); + + void setJavascriptFilename(String javascriptFilename); + + @Description("If true, don't run the actual query. Instead, calculate the distribution " + + "of number of query results per (event time) minute according to the query model.") + @Nullable + boolean getJustModelResultRate(); + + void setJustModelResultRate(boolean justModelResultRate); + + @Description("Coder strategy to use.") + @Nullable + NexmarkUtils.CoderStrategy getCoderStrategy(); + + void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy); + + @Description("Delay, in milliseconds, for each event. We will peg one core for this " + + "number of milliseconds to simulate CPU-bound computation.") + @Nullable + Long getCpuDelayMs(); + + void setCpuDelayMs(Long cpuDelayMs); + + @Description("Extra data, in bytes, to save to persistent state for each event. " + + "This will force I/O all the way to durable storage to simulate an " + + "I/O-bound computation.") + @Nullable + Long getDiskBusyBytes(); + + void setDiskBusyBytes(Long diskBusyBytes); + + @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction") + @Nullable + Integer getAuctionSkip(); + + void setAuctionSkip(Integer auctionSkip); + + @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).") + @Nullable + Integer getFanout(); + + void setFanout(Integer fanout); + + @Description("Maximum waiting time to clean personState in query3 " + + "(ie maximum waiting of the auctions related to person in state in seconds in event time).") + @Nullable + Integer getMaxAuctionsWaitingTime(); + + void setMaxAuctionsWaitingTime(Integer fanout); + + @Description("Length of occasional delay to impose on events (in seconds).") + @Nullable + Long getOccasionalDelaySec(); + + void setOccasionalDelaySec(Long occasionalDelaySec); + + @Description("Probability that an event will be delayed by delayS.") + @Nullable + Double getProbDelayedEvent(); + + void setProbDelayedEvent(Double probDelayedEvent); + + @Description("Maximum size of each log file (in events). For Query10 only.") + @Nullable + Integer getMaxLogEvents(); + + void setMaxLogEvents(Integer maxLogEvents); + + @Description("How to derive names of resources.") + @Default.Enum("QUERY_AND_SALT") + NexmarkUtils.ResourceNameMode getResourceNameMode(); + + void setResourceNameMode(NexmarkUtils.ResourceNameMode mode); + + @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.") + @Default.Boolean(true) + boolean getManageResources(); + + void setManageResources(boolean manageResources); + + @Description("If true, use pub/sub publish time instead of event time.") + @Nullable + Boolean getUsePubsubPublishTime(); + + void setUsePubsubPublishTime(Boolean usePubsubPublishTime); + + @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. " + + "1000 implies every 1000 events per generator are emitted in pseudo-random order.") + @Nullable + Long getOutOfOrderGroupSize(); + + void setOutOfOrderGroupSize(Long outOfOrderGroupSize); + + @Description("If false, do not add the Monitor and Snoop transforms.") + @Nullable + Boolean getDebug(); + + void setDebug(Boolean value); + + @Description("If set, cancel running pipelines after this long") + @Nullable + Long getRunningTimeMinutes(); + + void setRunningTimeMinutes(Long value); + + @Description("If set and --monitorJobs is true, check that the system watermark is never more " + + "than this far behind real time") + @Nullable + Long getMaxSystemLagSeconds(); + + void setMaxSystemLagSeconds(Long value); + + @Description("If set and --monitorJobs is true, check that the data watermark is never more " + + "than this far behind real time") + @Nullable + Long getMaxDataLagSeconds(); + + void setMaxDataLagSeconds(Long value); + + @Description("Only start validating watermarks after this many seconds") + @Nullable + Long getWatermarkValidationDelaySeconds(); + + void setWatermarkValidationDelaySeconds(Long value); +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java new file mode 100644 index 0000000..2edf4e8 --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkPerf.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Summary of performance for a particular run of a configuration. + */ +public class NexmarkPerf { + /** + * A sample of the number of events and number of results (if known) generated at + * a particular time. + */ + public static class ProgressSnapshot { + /** Seconds since job was started (in wallclock time). */ + @JsonProperty + double secSinceStart; + + /** Job runtime in seconds (time from first event to last generated event or output result). */ + @JsonProperty + double runtimeSec; + + /** Cumulative number of events generated. -1 if not known. */ + @JsonProperty + long numEvents; + + /** Cumulative number of results emitted. -1 if not known. */ + @JsonProperty + long numResults; + + /** + * Return true if there looks to be activity between {@code this} and {@code that} + * snapshots. + */ + public boolean anyActivity(ProgressSnapshot that) { + if (runtimeSec != that.runtimeSec) { + // An event or result end timestamp looks to have changed. + return true; + } + if (numEvents != that.numEvents) { + // Some more events were generated. + return true; + } + if (numResults != that.numResults) { + // Some more results were emitted. + return true; + } + return false; + } + } + + /** + * Progess snapshots. Null if not yet calculated. + */ + @JsonProperty + @Nullable + public List<ProgressSnapshot> snapshots = null; + + /** + * Effective runtime, in seconds. Measured from timestamp of first generated event to latest of + * timestamp of last generated event and last emitted result. -1 if not known. + */ + @JsonProperty + public double runtimeSec = -1.0; + + /** + * Number of events generated. -1 if not known. + */ + @JsonProperty + public long numEvents = -1; + + /** + * Number of events generated per second of runtime. For batch this is number of events + * over the above runtime. For streaming this is the 'steady-state' event generation rate sampled + * over the lifetime of the job. -1 if not known. + */ + @JsonProperty + public double eventsPerSec = -1.0; + + /** + * Number of event bytes generated per second of runtime. -1 if not known. + */ + @JsonProperty + public double eventBytesPerSec = -1.0; + + /** + * Number of results emitted. -1 if not known. + */ + @JsonProperty + public long numResults = -1; + + /** + * Number of results generated per second of runtime. -1 if not known. + */ + @JsonProperty + public double resultsPerSec = -1.0; + + /** + * Number of result bytes generated per second of runtime. -1 if not known. + */ + @JsonProperty + public double resultBytesPerSec = -1.0; + + /** + * Delay between start of job and first event in second. -1 if not known. + */ + @JsonProperty + public double startupDelaySec = -1.0; + + /** + * Delay between first event and first result in seconds. -1 if not known. + */ + @JsonProperty + public double processingDelaySec = -1.0; + + /** + * Delay between last result and job completion in seconds. -1 if not known. + */ + @JsonProperty + public double shutdownDelaySec = -1.0; + + /** + * Time-dilation factor. Calculate as event time advancement rate relative to real time. + * Greater than one implies we processed events faster than they would have been generated + * in real time. Less than one implies we could not keep up with events in real time. + * -1 if not known. + */ + @JsonProperty + double timeDilation = -1.0; + + /** + * List of errors encountered during job execution. + */ + @JsonProperty + @Nullable + public List<String> errors = null; + + /** + * The job id this perf was drawn from. Null if not known. + */ + @JsonProperty + @Nullable + public String jobId = null; + + /** + * Return a JSON representation of performance. + */ + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + /** + * Parse a {@link NexmarkPerf} object from JSON {@code string}. + */ + public static NexmarkPerf fromString(String string) { + try { + return NexmarkUtils.MAPPER.readValue(string, NexmarkPerf.class); + } catch (IOException e) { + throw new RuntimeException("Unable to parse nexmark perf: ", e); + } + } + + /** + * Return true if there looks to be activity between {@code this} and {@code that} + * perf values. + */ + public boolean anyActivity(NexmarkPerf that) { + if (runtimeSec != that.runtimeSec) { + // An event or result end timestamp looks to have changed. + return true; + } + if (numEvents != that.numEvents) { + // Some more events were generated. + return true; + } + if (numResults != that.numResults) { + // Some more results were emitted. + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java ---------------------------------------------------------------------- diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java new file mode 100644 index 0000000..d38cb7b --- /dev/null +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * A set of {@link NexmarkConfiguration}s. + */ +public enum NexmarkSuite { + /** + * The default. + */ + DEFAULT(defaultConf()), + + /** + * Sweep through all queries using the default configuration. + * 100k/10k events (depending on query). + */ + SMOKE(smoke()), + + /** + * As for SMOKE, but with 10m/1m events. + */ + STRESS(stress()), + + /** + * As for SMOKE, but with 1b/100m events. + */ + FULL_THROTTLE(fullThrottle()); + + private static List<NexmarkConfiguration> defaultConf() { + List<NexmarkConfiguration> configurations = new ArrayList<>(); + NexmarkConfiguration configuration = new NexmarkConfiguration(); + configurations.add(configuration); + return configurations; + } + + private static List<NexmarkConfiguration> smoke() { + List<NexmarkConfiguration> configurations = new ArrayList<>(); + for (int query = 0; query <= 12; query++) { + NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy(); + configuration.query = query; + configuration.numEvents = 100_000; + if (query == 4 || query == 6 || query == 9) { + // Scale back so overall runtimes are reasonably close across all queries. + configuration.numEvents /= 10; + } + configurations.add(configuration); + } + return configurations; + } + + private static List<NexmarkConfiguration> stress() { + List<NexmarkConfiguration> configurations = smoke(); + for (NexmarkConfiguration configuration : configurations) { + if (configuration.numEvents >= 0) { + configuration.numEvents *= 1000; + } + } + return configurations; + } + + private static List<NexmarkConfiguration> fullThrottle() { + List<NexmarkConfiguration> configurations = smoke(); + for (NexmarkConfiguration configuration : configurations) { + if (configuration.numEvents >= 0) { + configuration.numEvents *= 1000; + } + } + return configurations; + } + + private final List<NexmarkConfiguration> configurations; + + NexmarkSuite(List<NexmarkConfiguration> configurations) { + this.configurations = configurations; + } + + /** + * Return the configurations corresponding to this suite. We'll override each configuration + * with any set command line flags, except for --isStreaming which is only respected for + * the {@link #DEFAULT} suite. + */ + public Iterable<NexmarkConfiguration> getConfigurations(NexmarkOptions options) { + Set<NexmarkConfiguration> results = new LinkedHashSet<>(); + for (NexmarkConfiguration configuration : configurations) { + NexmarkConfiguration result = configuration.copy(); + result.overrideFromOptions(options); + results.add(result); + } + return results; + } +}