http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java deleted file mode 100644 index 7adb1b2..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Generator.java +++ /dev/null @@ -1,589 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Random; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; - -/** - * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure - * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have - * valid auction and bidder ids which can be joined to already-generated Auction and Person events. - * - * <p>To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new - * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs} - * (in microseconds). The event stream is thus fully deterministic and does not depend on - * wallclock time. - * - * <p>This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark} - * so that we can resume generating events from a saved snapshot. - */ -public class Generator implements Iterator<TimestampedValue<Event>>, Serializable { - /** - * Keep the number of categories small so the example queries will find results even with - * a small batch of events. - */ - private static final int NUM_CATEGORIES = 5; - - /** Smallest random string size. */ - private static final int MIN_STRING_LENGTH = 3; - - /** - * Keep the number of states small so that the example queries will find results even with - * a small batch of events. - */ - private static final List<String> US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(",")); - - private static final List<String> US_CITIES = - Arrays.asList( - ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne") - .split(",")); - - private static final List<String> FIRST_NAMES = - Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(",")); - - private static final List<String> LAST_NAMES = - Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(",")); - - /** - * Number of yet-to-be-created people and auction ids allowed. - */ - private static final int PERSON_ID_LEAD = 10; - private static final int AUCTION_ID_LEAD = 10; - - /** - * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1 - * over these values. - */ - private static final int HOT_AUCTION_RATIO = 100; - private static final int HOT_SELLER_RATIO = 100; - private static final int HOT_BIDDER_RATIO = 100; - - /** - * Just enough state to be able to restore a generator back to where it was checkpointed. - */ - public static class Checkpoint implements UnboundedSource.CheckpointMark { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - - /** Coder for this class. */ - public static final Coder<Checkpoint> CODER_INSTANCE = - new AtomicCoder<Checkpoint>() { - @Override - public void encode( - Checkpoint value, - OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - LONG_CODER.encode(value.numEvents, outStream, Context.NESTED); - LONG_CODER.encode(value.wallclockBaseTime, outStream, Context.NESTED); - } - - @Override - public Checkpoint decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - long numEvents = LONG_CODER.decode(inStream, Context.NESTED); - long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED); - return new Checkpoint(numEvents, wallclockBaseTime); - } - }; - - private long numEvents; - private long wallclockBaseTime; - - private Checkpoint(long numEvents, long wallclockBaseTime) { - this.numEvents = numEvents; - this.wallclockBaseTime = wallclockBaseTime; - } - - public Generator toGenerator(GeneratorConfig config) { - return new Generator(config, numEvents, wallclockBaseTime); - } - - @Override - public void finalizeCheckpoint() throws IOException { - // Nothing to finalize. - } - - @Override - public String toString() { - return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}", - numEvents, wallclockBaseTime); - } - } - - /** - * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then - * (arbitrary but stable) event hash order. - */ - public static class NextEvent implements Comparable<NextEvent> { - /** When, in wallclock time, should this event be emitted? */ - public final long wallclockTimestamp; - - /** When, in event time, should this event be considered to have occured? */ - public final long eventTimestamp; - - /** The event itself. */ - public final Event event; - - /** The minimum of this and all future event timestamps. */ - public final long watermark; - - public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) { - this.wallclockTimestamp = wallclockTimestamp; - this.eventTimestamp = eventTimestamp; - this.event = event; - this.watermark = watermark; - } - - /** - * Return a deep clone of next event with delay added to wallclock timestamp and - * event annotate as 'LATE'. - */ - public NextEvent withDelay(long delayMs) { - return new NextEvent( - wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark); - } - - @Override - public int compareTo(NextEvent other) { - int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp); - if (i != 0) { - return i; - } - return Integer.compare(event.hashCode(), other.event.hashCode()); - } - } - - /** - * Configuration to generate events against. Note that it may be replaced by a call to - * {@link #splitAtEventId}. - */ - private GeneratorConfig config; - - /** Number of events generated by this generator. */ - private long numEvents; - - /** - * Wallclock time at which we emitted the first event (ms since epoch). Initially -1. - */ - private long wallclockBaseTime; - - private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) { - checkNotNull(config); - this.config = config; - this.numEvents = numEvents; - this.wallclockBaseTime = wallclockBaseTime; - } - - /** - * Create a fresh generator according to {@code config}. - */ - public Generator(GeneratorConfig config) { - this(config, 0, -1); - } - - /** - * Return a checkpoint for the current generator. - */ - public Checkpoint toCheckpoint() { - return new Checkpoint(numEvents, wallclockBaseTime); - } - - /** - * Return a deep clone of this generator. - */ - @Override - public Generator clone() { - return new Generator(config.clone(), numEvents, wallclockBaseTime); - } - - /** - * Return the current config for this generator. Note that configs may be replaced by {@link - * #splitAtEventId}. - */ - public GeneratorConfig getCurrentConfig() { - return config; - } - - /** - * Mutate this generator so that it will only generate events up to but not including - * {@code eventId}. Return a config to represent the events this generator will no longer yield. - * The generators will run in on a serial timeline. - */ - public GeneratorConfig splitAtEventId(long eventId) { - long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber); - GeneratorConfig remainConfig = config.cloneWith(config.firstEventId, - config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents); - config = config.cloneWith(config.firstEventId, newMaxEvents, config.firstEventNumber); - return remainConfig; - } - - /** - * Return the next 'event id'. Though events don't have ids we can simulate them to - * help with bookkeeping. - */ - public long getNextEventId() { - return config.firstEventId + config.nextAdjustedEventNumber(numEvents); - } - - /** - * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if - * due to generate a person. - */ - private long lastBase0PersonId() { - long eventId = getNextEventId(); - long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; - long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; - if (offset >= GeneratorConfig.PERSON_PROPORTION) { - // About to generate an auction or bid. - // Go back to the last person generated in this epoch. - offset = GeneratorConfig.PERSON_PROPORTION - 1; - } - // About to generate a person. - return epoch * GeneratorConfig.PERSON_PROPORTION + offset; - } - - /** - * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if - * due to generate an auction. - */ - private long lastBase0AuctionId() { - long eventId = getNextEventId(); - long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; - long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; - if (offset < GeneratorConfig.PERSON_PROPORTION) { - // About to generate a person. - // Go back to the last auction in the last epoch. - epoch--; - offset = GeneratorConfig.AUCTION_PROPORTION - 1; - } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { - // About to generate a bid. - // Go back to the last auction generated in this epoch. - offset = GeneratorConfig.AUCTION_PROPORTION - 1; - } else { - // About to generate an auction. - offset -= GeneratorConfig.PERSON_PROPORTION; - } - return epoch * GeneratorConfig.AUCTION_PROPORTION + offset; - } - - /** return a random US state. */ - private static String nextUSState(Random random) { - return US_STATES.get(random.nextInt(US_STATES.size())); - } - - /** Return a random US city. */ - private static String nextUSCity(Random random) { - return US_CITIES.get(random.nextInt(US_CITIES.size())); - } - - /** Return a random person name. */ - private static String nextPersonName(Random random) { - return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " " - + LAST_NAMES.get(random.nextInt(LAST_NAMES.size())); - } - - /** Return a random string of up to {@code maxLength}. */ - private static String nextString(Random random, int maxLength) { - int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH); - StringBuilder sb = new StringBuilder(); - while (len-- > 0) { - if (random.nextInt(13) == 0) { - sb.append(' '); - } else { - sb.append((char) ('a' + random.nextInt(26))); - } - } - return sb.toString().trim(); - } - - /** Return a random string of exactly {@code length}. */ - private static String nextExactString(Random random, int length) { - StringBuilder sb = new StringBuilder(); - while (length-- > 0) { - sb.append((char) ('a' + random.nextInt(26))); - } - return sb.toString(); - } - - /** Return a random email address. */ - private static String nextEmail(Random random) { - return nextString(random, 7) + "@" + nextString(random, 5) + ".com"; - } - - /** Return a random credit card number. */ - private static String nextCreditCard(Random random) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < 4; i++) { - if (i > 0) { - sb.append(' '); - } - sb.append(String.format("%04d", random.nextInt(10000))); - } - return sb.toString(); - } - - /** Return a random price. */ - private static long nextPrice(Random random) { - return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0); - } - - /** Return a random time delay, in milliseconds, for length of auctions. */ - private long nextAuctionLengthMs(Random random, long timestamp) { - // What's our current event number? - long currentEventNumber = config.nextAdjustedEventNumber(numEvents); - // How many events till we've generated numInFlightAuctions? - long numEventsForAuctions = - (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR) - / GeneratorConfig.AUCTION_PROPORTION; - // When will the auction numInFlightAuctions beyond now be generated? - long futureAuction = - config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions) - .getKey(); - // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n", - // futureAuction - timestamp, numEventsForAuctions); - // Choose a length with average horizonMs. - long horizonMs = futureAuction - timestamp; - return 1L + nextLong(random, Math.max(horizonMs * 2, 1L)); - } - - /** - * Return a random {@code string} such that {@code currentSize + string.length()} is on average - * {@code averageSize}. - */ - private static String nextExtra(Random random, int currentSize, int desiredAverageSize) { - if (currentSize > desiredAverageSize) { - return ""; - } - desiredAverageSize -= currentSize; - int delta = (int) Math.round(desiredAverageSize * 0.2); - int minSize = desiredAverageSize - delta; - int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta)); - return nextExactString(random, desiredSize); - } - - /** Return a random long from {@code [0, n)}. */ - private static long nextLong(Random random, long n) { - if (n < Integer.MAX_VALUE) { - return random.nextInt((int) n); - } else { - // TODO: Very skewed distribution! Bad! - return Math.abs(random.nextLong()) % n; - } - } - - /** - * Generate and return a random person with next available id. - */ - private Person nextPerson(Random random, long timestamp) { - long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID; - String name = nextPersonName(random); - String email = nextEmail(random); - String creditCard = nextCreditCard(random); - String city = nextUSCity(random); - String state = nextUSState(random); - int currentSize = - 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length(); - String extra = nextExtra(random, currentSize, config.configuration.avgPersonByteSize); - return new Person(id, name, email, creditCard, city, state, timestamp, extra); - } - - /** - * Return a random person id (base 0). - */ - private long nextBase0PersonId(Random random) { - // Choose a random person from any of the 'active' people, plus a few 'leads'. - // By limiting to 'active' we ensure the density of bids or auctions per person - // does not decrease over time for long running jobs. - // By choosing a person id ahead of the last valid person id we will make - // newPerson and newAuction events appear to have been swapped in time. - long numPeople = lastBase0PersonId() + 1; - long activePeople = Math.min(numPeople, config.configuration.numActivePeople); - long n = nextLong(random, activePeople + PERSON_ID_LEAD); - return numPeople - activePeople + n; - } - - /** - * Return a random auction id (base 0). - */ - private long nextBase0AuctionId(Random random) { - // Choose a random auction for any of those which are likely to still be in flight, - // plus a few 'leads'. - // Note that ideally we'd track non-expired auctions exactly, but that state - // is difficult to split. - long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0); - long maxAuction = lastBase0AuctionId(); - return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD); - } - - /** - * Generate and return a random auction with next available id. - */ - private Auction nextAuction(Random random, long timestamp) { - long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID; - - long seller; - // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio. - if (random.nextInt(config.configuration.hotSellersRatio) > 0) { - // Choose the first person in the batch of last HOT_SELLER_RATIO people. - seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO; - } else { - seller = nextBase0PersonId(random); - } - seller += GeneratorConfig.FIRST_PERSON_ID; - - long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES); - long initialBid = nextPrice(random); - long dateTime = timestamp; - long expires = timestamp + nextAuctionLengthMs(random, timestamp); - String name = nextString(random, 20); - String desc = nextString(random, 100); - long reserve = initialBid + nextPrice(random); - int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8; - String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize); - return new Auction(id, name, desc, initialBid, reserve, dateTime, expires, seller, category, - extra); - } - - /** - * Generate and return a random bid with next available id. - */ - private Bid nextBid(Random random, long timestamp) { - long auction; - // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio. - if (random.nextInt(config.configuration.hotAuctionRatio) > 0) { - // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions. - auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO; - } else { - auction = nextBase0AuctionId(random); - } - auction += GeneratorConfig.FIRST_AUCTION_ID; - - long bidder; - // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio - if (random.nextInt(config.configuration.hotBiddersRatio) > 0) { - // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of - // last HOT_BIDDER_RATIO people. - bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1; - } else { - bidder = nextBase0PersonId(random); - } - bidder += GeneratorConfig.FIRST_PERSON_ID; - - long price = nextPrice(random); - int currentSize = 8 + 8 + 8 + 8; - String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize); - return new Bid(auction, bidder, price, timestamp, extra); - } - - @Override - public boolean hasNext() { - return numEvents < config.maxEvents; - } - - /** - * Return the next event. The outer timestamp is in wallclock time and corresponds to - * when the event should fire. The inner timestamp is in event-time and represents the - * time the event is purported to have taken place in the simulation. - */ - public NextEvent nextEvent() { - if (wallclockBaseTime < 0) { - wallclockBaseTime = System.currentTimeMillis(); - } - // When, in event time, we should generate the event. Monotonic. - long eventTimestamp = - config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey(); - // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize - // may have local jitter. - long adjustedEventTimestamp = - config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents)) - .getKey(); - // The minimum of this and all future adjusted event timestamps. Accounts for jitter in - // the event timestamp. - long watermark = - config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents)) - .getKey(); - // When, in wallclock time, we should emit the event. - long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime); - - // Seed the random number generator with the next 'event id'. - Random random = new Random(getNextEventId()); - long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR; - - Event event; - if (rem < GeneratorConfig.PERSON_PROPORTION) { - event = new Event(nextPerson(random, adjustedEventTimestamp)); - } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { - event = new Event(nextAuction(random, adjustedEventTimestamp)); - } else { - event = new Event(nextBid(random, adjustedEventTimestamp)); - } - - numEvents++; - return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark); - } - - @Override - public TimestampedValue<Event> next() { - NextEvent next = nextEvent(); - return TimestampedValue.of(next.event, new Instant(next.eventTimestamp)); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - /** - * Return how many microseconds till we emit the next event. - */ - public long currentInterEventDelayUs() { - return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)) - .getValue(); - } - - /** - * Return an estimate of fraction of output consumed. - */ - public double getFractionConsumed() { - return (double) numEvents / config.maxEvents; - } - - @Override - public String toString() { - return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config, - numEvents, wallclockBaseTime); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java deleted file mode 100644 index dceff4f..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.apache.beam.sdk.values.KV; - -/** - * Parameters controlling how {@link Generator} synthesizes {@link Event} elements. - */ -class GeneratorConfig implements Serializable { - /** - * We start the ids at specific values to help ensure the queries find a match even on - * small synthesized dataset sizes. - */ - public static final long FIRST_AUCTION_ID = 1000L; - public static final long FIRST_PERSON_ID = 1000L; - public static final long FIRST_CATEGORY_ID = 10L; - - /** - * Proportions of people/auctions/bids to synthesize. - */ - public static final int PERSON_PROPORTION = 1; - public static final int AUCTION_PROPORTION = 3; - public static final int BID_PROPORTION = 46; - public static final int PROPORTION_DENOMINATOR = - PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION; - - /** - * Environment options. - */ - public final NexmarkConfiguration configuration; - - /** - * Delay between events, in microseconds. If the array has more than one entry then - * the rate is changed every {@link #stepLengthSec}, and wraps around. - */ - public final long[] interEventDelayUs; - - /** - * Delay before changing the current inter-event delay. - */ - public final long stepLengthSec; - - /** - * Time for first event (ms since epoch). - */ - public final long baseTime; - - /** - * Event id of first event to be generated. Event ids are unique over all generators, and - * are used as a seed to generate each event's data. - */ - public final long firstEventId; - - /** - * Maximum number of events to generate. - */ - public final long maxEvents; - - /** - * First event number. Generators running in parallel time may share the same event number, - * and the event number is used to determine the event timestamp. - */ - public final long firstEventNumber; - - /** - * True period of epoch in milliseconds. Derived from above. - * (Ie time to run through cycle for all interEventDelayUs entries). - */ - public final long epochPeriodMs; - - /** - * Number of events per epoch. Derived from above. - * (Ie number of events to run through cycle for all interEventDelayUs entries). - */ - public final long eventsPerEpoch; - - public GeneratorConfig( - NexmarkConfiguration configuration, long baseTime, long firstEventId, - long maxEventsOrZero, long firstEventNumber) { - this.configuration = configuration; - this.interEventDelayUs = configuration.rateShape.interEventDelayUs( - configuration.firstEventRate, configuration.nextEventRate, - configuration.rateUnit, configuration.numEventGenerators); - this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec); - this.baseTime = baseTime; - this.firstEventId = firstEventId; - if (maxEventsOrZero == 0) { - // Scale maximum down to avoid overflow in getEstimatedSizeBytes. - this.maxEvents = - Long.MAX_VALUE / (PROPORTION_DENOMINATOR - * Math.max( - Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize), - configuration.avgBidByteSize)); - } else { - this.maxEvents = maxEventsOrZero; - } - this.firstEventNumber = firstEventNumber; - - long eventsPerEpoch = 0; - long epochPeriodMs = 0; - if (interEventDelayUs.length > 1) { - for (int i = 0; i < interEventDelayUs.length; i++) { - long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i]; - eventsPerEpoch += numEventsForThisCycle; - epochPeriodMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L; - } - } - this.eventsPerEpoch = eventsPerEpoch; - this.epochPeriodMs = epochPeriodMs; - } - - /** - * Return a clone of this config. - */ - @Override - public GeneratorConfig clone() { - return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber); - } - - /** - * Return clone of this config except with given parameters. - */ - public GeneratorConfig cloneWith(long firstEventId, long maxEvents, long firstEventNumber) { - return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber); - } - - /** - * Split this config into {@code n} sub-configs with roughly equal number of - * possible events, but distinct value spaces. The generators will run on parallel timelines. - * This config should no longer be used. - */ - public List<GeneratorConfig> split(int n) { - List<GeneratorConfig> results = new ArrayList<>(); - if (n == 1) { - // No split required. - results.add(this); - } else { - long subMaxEvents = maxEvents / n; - long subFirstEventId = firstEventId; - for (int i = 0; i < n; i++) { - if (i == n - 1) { - // Don't loose any events to round-down. - subMaxEvents = maxEvents - subMaxEvents * (n - 1); - } - results.add(cloneWith(subFirstEventId, subMaxEvents, firstEventNumber)); - subFirstEventId += subMaxEvents; - } - } - return results; - } - - /** - * Return an estimate of the bytes needed by {@code numEvents}. - */ - public long estimatedBytesForEvents(long numEvents) { - long numPersons = - (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR; - long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR; - long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR; - return numPersons * configuration.avgPersonByteSize - + numAuctions * configuration.avgAuctionByteSize - + numBids * configuration.avgBidByteSize; - } - - /** - * Return an estimate of the byte-size of all events a generator for this config would yield. - */ - public long getEstimatedSizeBytes() { - return estimatedBytesForEvents(maxEvents); - } - - /** - * Return the first 'event id' which could be generated from this config. Though events don't - * have ids we can simulate them to help bookkeeping. - */ - public long getStartEventId() { - return firstEventId + firstEventNumber; - } - - /** - * Return one past the last 'event id' which could be generated from this config. - */ - public long getStopEventId() { - return firstEventId + firstEventNumber + maxEvents; - } - - /** - * Return the next event number for a generator which has so far emitted {@code numEvents}. - */ - public long nextEventNumber(long numEvents) { - return firstEventNumber + numEvents; - } - - /** - * Return the next event number for a generator which has so far emitted {@code numEvents}, - * but adjusted to account for {@code outOfOrderGroupSize}. - */ - public long nextAdjustedEventNumber(long numEvents) { - long n = configuration.outOfOrderGroupSize; - long eventNumber = nextEventNumber(numEvents); - long base = (eventNumber / n) * n; - long offset = (eventNumber * 953) % n; - return base + offset; - } - - /** - * Return the event number who's event time will be a suitable watermark for - * a generator which has so far emitted {@code numEvents}. - */ - public long nextEventNumberForWatermark(long numEvents) { - long n = configuration.outOfOrderGroupSize; - long eventNumber = nextEventNumber(numEvents); - return (eventNumber / n) * n; - } - - /** - * What timestamp should the event with {@code eventNumber} have for this generator? And - * what inter-event delay (in microseconds) is current? - */ - public KV<Long, Long> timestampAndInterEventDelayUsForEvent(long eventNumber) { - if (interEventDelayUs.length == 1) { - long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L; - return KV.of(timestamp, interEventDelayUs[0]); - } - - long epoch = eventNumber / eventsPerEpoch; - long n = eventNumber % eventsPerEpoch; - long offsetInEpochMs = 0; - for (int i = 0; i < interEventDelayUs.length; i++) { - long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i]; - if (n < numEventsForThisCycle) { - long offsetInCycleUs = n * interEventDelayUs[i]; - long timestamp = - baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L); - return KV.of(timestamp, interEventDelayUs[i]); - } - n -= numEventsForThisCycle; - offsetInEpochMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L; - } - throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("GeneratorConfig"); - sb.append("{configuration:"); - sb.append(configuration.toString()); - sb.append(";interEventDelayUs=["); - for (int i = 0; i < interEventDelayUs.length; i++) { - if (i > 0) { - sb.append(","); - } - sb.append(interEventDelayUs[i]); - } - sb.append("]"); - sb.append(";stepLengthSec:"); - sb.append(stepLengthSec); - sb.append(";baseTime:"); - sb.append(baseTime); - sb.append(";firstEventId:"); - sb.append(firstEventId); - sb.append(";maxEvents:"); - sb.append(maxEvents); - sb.append(";firstEventNumber:"); - sb.append(firstEventNumber); - sb.append(";epochPeriodMs:"); - sb.append(epochPeriodMs); - sb.append(";eventsPerEpoch:"); - sb.append(eventsPerEpoch); - sb.append("}"); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java deleted file mode 100644 index 21fa3f4..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result type of {@link Query8}. - */ -public class IdNameReserve implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - - public static final Coder<IdNameReserve> CODER = new AtomicCoder<IdNameReserve>() { - @Override - public void encode(IdNameReserve value, OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - LONG_CODER.encode(value.id, outStream, Context.NESTED); - STRING_CODER.encode(value.name, outStream, Context.NESTED); - LONG_CODER.encode(value.reserve, outStream, Context.NESTED); - } - - @Override - public IdNameReserve decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - long id = LONG_CODER.decode(inStream, Context.NESTED); - String name = STRING_CODER.decode(inStream, Context.NESTED); - long reserve = LONG_CODER.decode(inStream, Context.NESTED); - return new IdNameReserve(id, name, reserve); - } - }; - - @JsonProperty - public final long id; - - @JsonProperty - public final String name; - - /** Reserve price in cents. */ - @JsonProperty - public final long reserve; - - // For Avro only. - @SuppressWarnings("unused") - private IdNameReserve() { - id = 0; - name = null; - reserve = 0; - } - - public IdNameReserve(long id, String name, long reserve) { - this.id = id; - this.name = name; - this.reserve = reserve; - } - - @Override - public long sizeInBytes() { - return 8 + name.length() + 1 + 8; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java deleted file mode 100644 index 2093c48..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -/** - * Interface for elements which can quickly estimate their encoded byte size. - */ -public interface KnownSize { - long sizeInBytes(); -} - http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java index 02660bf..6370e41 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java @@ -19,6 +19,7 @@ package org.apache.beam.integration.nexmark; import java.io.Serializable; +import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Max; http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java deleted file mode 100644 index fe4687b..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; - -/** - * Result of {@link Query3}. - */ -public class NameCityStateId implements KnownSize, Serializable { - private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - - public static final Coder<NameCityStateId> CODER = new AtomicCoder<NameCityStateId>() { - @Override - public void encode(NameCityStateId value, OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - STRING_CODER.encode(value.name, outStream, Context.NESTED); - STRING_CODER.encode(value.city, outStream, Context.NESTED); - STRING_CODER.encode(value.state, outStream, Context.NESTED); - LONG_CODER.encode(value.id, outStream, Context.NESTED); - } - - @Override - public NameCityStateId decode( - InputStream inStream, Coder.Context context) - throws CoderException, IOException { - String name = STRING_CODER.decode(inStream, Context.NESTED); - String city = STRING_CODER.decode(inStream, Context.NESTED); - String state = STRING_CODER.decode(inStream, Context.NESTED); - long id = LONG_CODER.decode(inStream, Context.NESTED); - return new NameCityStateId(name, city, state, id); - } - }; - - @JsonProperty - public final String name; - - @JsonProperty - public final String city; - - @JsonProperty - public final String state; - - @JsonProperty - public final long id; - - // For Avro only. - @SuppressWarnings("unused") - private NameCityStateId() { - name = null; - city = null; - state = null; - id = 0; - } - - public NameCityStateId(String name, String city, String state, long id) { - this.name = name; - this.city = city; - this.state = state; - this.id = id; - } - - @Override - public long sizeInBytes() { - return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8; - } - - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java deleted file mode 100644 index 4c2721e..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexDriver.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.ApexRunner; -import org.apache.beam.sdk.options.PipelineOptionsFactory; - -/** - * Run NexMark queries using the Apex runner. - */ -public class NexmarkApexDriver extends NexmarkDriver<NexmarkApexDriver.NexmarkApexOptions> { - /** - * Command line flags. - */ - public interface NexmarkApexOptions extends Options, ApexPipelineOptions { - } - - /** - * Entry point. - */ - public static void main(String[] args) { - // Gather command line args, baseline, configurations, etc. - NexmarkApexOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(NexmarkApexOptions.class); - options.setRunner(ApexRunner.class); - NexmarkApexRunner runner = new NexmarkApexRunner(options); - new NexmarkApexDriver().runAll(options, runner); - } -} - - http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java deleted file mode 100644 index 3b8993a..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import javax.annotation.Nullable; - -/** - * Run a query using the Apex runner. - */ -public class NexmarkApexRunner extends NexmarkRunner<NexmarkApexDriver.NexmarkApexOptions> { - @Override - protected boolean isStreaming() { - return options.isStreaming(); - } - - @Override - protected int coresPerWorker() { - return 4; - } - - @Override - protected int maxNumWorkers() { - return 5; - } - - @Override - protected void invokeBuilderForPublishOnlyPipeline( - PipelineBuilder builder) { - builder.build(options); - } - - @Override - protected void waitForPublisherPreload() { - throw new UnsupportedOperationException(); - } - - @Override - @Nullable - protected NexmarkPerf monitor(NexmarkQuery query) { - return null; - } - - public NexmarkApexRunner(NexmarkApexDriver.NexmarkApexOptions options) { - super(options); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java index 0943664..e2890ed 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java @@ -29,7 +29,7 @@ import java.util.Objects; * programmatically. We only capture properties which may influence the resulting * pipeline performance, as captured by {@link NexmarkPerf}. */ -class NexmarkConfiguration implements Serializable { +public class NexmarkConfiguration implements Serializable { public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration(); /** If {@literal true}, include additional debugging and monitoring stats. */ @@ -228,7 +228,7 @@ class NexmarkConfiguration implements Serializable { /** * Replace any properties of this configuration which have been supplied by the command line. */ - public void overrideFromOptions(Options options) { + public void overrideFromOptions(NexmarkOptions options) { if (options.getDebug() != null) { debug = options.getDebug(); } @@ -511,8 +511,6 @@ class NexmarkConfiguration implements Serializable { /** * Parse an object from {@code string}. - * - * @throws IOException */ public static NexmarkConfiguration fromString(String string) { try { http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java deleted file mode 100644 index 24fcc01..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectDriver.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import org.apache.beam.runners.direct.DirectOptions; -import org.apache.beam.runners.direct.DirectRunner; -import org.apache.beam.sdk.options.PipelineOptionsFactory; - -/** - * An implementation of the 'NEXMark queries' using the Direct Runner. - */ -class NexmarkDirectDriver extends NexmarkDriver<NexmarkDirectDriver.NexmarkDirectOptions> { - /** - * Command line flags. - */ - public interface NexmarkDirectOptions extends Options, DirectOptions { - } - - /** - * Entry point. - */ - public static void main(String[] args) { - NexmarkDirectOptions options = - PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(NexmarkDirectOptions.class); - options.setRunner(DirectRunner.class); - NexmarkDirectRunner runner = new NexmarkDirectRunner(options); - new NexmarkDirectDriver().runAll(options, runner); - } -} - http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java deleted file mode 100644 index 0119bbc..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -/** - * Run a single query using the Direct Runner. - */ -class NexmarkDirectRunner extends NexmarkRunner<NexmarkDirectDriver.NexmarkDirectOptions> { - public NexmarkDirectRunner(NexmarkDirectDriver.NexmarkDirectOptions options) { - super(options); - } - - @Override - protected boolean isStreaming() { - return options.isStreaming(); - } - - @Override - protected int coresPerWorker() { - return 4; - } - - @Override - protected int maxNumWorkers() { - return 1; - } - - @Override - protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) { - throw new UnsupportedOperationException( - "Cannot use --pubSubMode=COMBINED with DirectRunner"); - } - - /** - * Monitor the progress of the publisher job. Return when it has been generating events for - * at least {@code configuration.preloadSeconds}. - */ - @Override - protected void waitForPublisherPreload() { - throw new UnsupportedOperationException( - "Cannot use --pubSubMode=COMBINED with DirectRunner"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java index e6a7b0b..4714124 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java @@ -28,6 +28,9 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.Person; import org.joda.time.Duration; import org.joda.time.Instant; @@ -48,7 +51,7 @@ import org.joda.time.Instant; * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/"> * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a> */ -public class NexmarkDriver<OptionT extends Options> { +public class NexmarkDriver<OptionT extends NexmarkOptions> { /** * Entry point. http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java deleted file mode 100644 index 61a7d29..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.sdk.options.PipelineOptionsFactory; - -/** - * Run NexMark queries using the Flink runner. - */ -public class NexmarkFlinkDriver extends NexmarkDriver<NexmarkFlinkDriver.NexmarkFlinkOptions> { - /** - * Command line flags. - */ - public interface NexmarkFlinkOptions extends Options, FlinkPipelineOptions { - } - - /** - * Entry point. - */ - public static void main(String[] args) { - // Gather command line args, baseline, configurations, etc. - NexmarkFlinkOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(NexmarkFlinkOptions.class); - options.setRunner(FlinkRunner.class); - NexmarkFlinkRunner runner = new NexmarkFlinkRunner(options); - new NexmarkFlinkDriver().runAll(options, runner); - } -} - - http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java deleted file mode 100644 index 95ab1ad..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -/** - * Run a query using the Flink runner. - */ -public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.NexmarkFlinkOptions> { - @Override - protected boolean isStreaming() { - return options.isStreaming(); - } - - @Override - protected int coresPerWorker() { - return 4; - } - - @Override - protected int maxNumWorkers() { - return 5; - } - - @Override - protected void invokeBuilderForPublishOnlyPipeline( - PipelineBuilder builder) { - builder.build(options); - } - - @Override - protected void waitForPublisherPreload() { - throw new UnsupportedOperationException(); - } - - public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) { - super(options); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java deleted file mode 100644 index 50c2a7c..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; - -/** - * An implementation of the 'NEXMark queries' for Google Dataflow. - * These are multiple queries over a three table schema representing an online auction system: - * <ul> - * <li>{@link Person} represents a person submitting an item for auction and/or making a bid - * on an auction. - * <li>{@link Auction} represents an item under auction. - * <li>{@link Bid} represents a bid for an item under auction. - * </ul> - * The queries exercise many aspects of streaming dataflow. - * - * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not - * particularly sensible. - * - * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/"> - * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a> - */ -class NexmarkGoogleDriver extends NexmarkDriver<NexmarkGoogleDriver.NexmarkGoogleOptions> { - /** - * Command line flags. - */ - public interface NexmarkGoogleOptions extends Options, DataflowPipelineOptions { - - } - - /** - * Entry point. - */ - public static void main(String[] args) { - // Gather command line args, baseline, configurations, etc. - NexmarkGoogleOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(NexmarkGoogleOptions.class); - options.setRunner(DataflowRunner.class); - NexmarkGoogleRunner runner = new NexmarkGoogleRunner(options); - new NexmarkGoogleDriver().runAll(options, runner); - } -} - http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java deleted file mode 100644 index f4bfb1e..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.beam.runners.dataflow.DataflowPipelineJob; -import org.apache.beam.sdk.PipelineResult; -import org.joda.time.Duration; - -/** - * Run a singe Nexmark query using a given configuration on Google Dataflow. - */ -class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogleOptions> { - - public NexmarkGoogleRunner(NexmarkGoogleDriver.NexmarkGoogleOptions options) { - super(options); - } - - @Override - protected boolean isStreaming() { - return options.isStreaming(); - } - - @Override - protected int coresPerWorker() { - String machineType = options.getWorkerMachineType(); - if (machineType == null || machineType.isEmpty()) { - return 1; - } - String[] split = machineType.split("-"); - if (split.length != 3) { - return 1; - } - try { - return Integer.parseInt(split[2]); - } catch (NumberFormatException ex) { - return 1; - } - } - - @Override - protected int maxNumWorkers() { - return Math.max(options.getNumWorkers(), options.getMaxNumWorkers()); - } - - @Override - protected String getJobId(PipelineResult job) { - return ((DataflowPipelineJob) job).getJobId(); - } - - @Override - protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) { - String jobName = options.getJobName(); - String appName = options.getAppName(); - options.setJobName("p-" + jobName); - options.setAppName("p-" + appName); - int coresPerWorker = coresPerWorker(); - int eventGeneratorWorkers = (configuration.numEventGenerators + coresPerWorker - 1) - / coresPerWorker; - options.setMaxNumWorkers(Math.min(options.getMaxNumWorkers(), eventGeneratorWorkers)); - options.setNumWorkers(Math.min(options.getNumWorkers(), eventGeneratorWorkers)); - publisherMonitor = new Monitor<Event>(queryName, "publisher"); - try { - builder.build(options); - } finally { - options.setJobName(jobName); - options.setAppName(appName); - options.setMaxNumWorkers(options.getMaxNumWorkers()); - options.setNumWorkers(options.getNumWorkers()); - } - } - - /** - * Monitor the progress of the publisher job. Return when it has been generating events for - * at least {@code configuration.preloadSeconds}. - */ - @Override - protected void waitForPublisherPreload() { - checkNotNull(publisherMonitor); - checkNotNull(publisherResult); - if (!options.getMonitorJobs()) { - return; - } - if (!(publisherResult instanceof DataflowPipelineJob)) { - return; - } - if (configuration.preloadSeconds <= 0) { - return; - } - - NexmarkUtils.console("waiting for publisher to pre-load"); - - DataflowPipelineJob job = (DataflowPipelineJob) publisherResult; - - long numEvents = 0; - long startMsSinceEpoch = -1; - long endMsSinceEpoch = -1; - while (true) { - PipelineResult.State state = job.getState(); - switch (state) { - case UNKNOWN: - // Keep waiting. - NexmarkUtils.console("%s publisher (%d events)", state, numEvents); - break; - case STOPPED: - case DONE: - case CANCELLED: - case FAILED: - case UPDATED: - NexmarkUtils.console("%s publisher (%d events)", state, numEvents); - return; - case RUNNING: - numEvents = getLong(job, publisherMonitor.getElementCounter()); - if (startMsSinceEpoch < 0 && numEvents > 0) { - startMsSinceEpoch = System.currentTimeMillis(); - endMsSinceEpoch = startMsSinceEpoch - + Duration.standardSeconds(configuration.preloadSeconds).getMillis(); - } - if (endMsSinceEpoch < 0) { - NexmarkUtils.console("%s publisher (%d events)", state, numEvents); - } else { - long remainMs = endMsSinceEpoch - System.currentTimeMillis(); - if (remainMs > 0) { - NexmarkUtils.console("%s publisher (%d events, waiting for %ds)", state, numEvents, - remainMs / 1000); - } else { - NexmarkUtils.console("publisher preloaded %d events", numEvents); - return; - } - } - break; - } - - try { - Thread.sleep(PERF_DELAY.getMillis()); - } catch (InterruptedException e) { - Thread.interrupted(); - throw new RuntimeException("Interrupted: publisher still running."); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java new file mode 100644 index 0000000..1be974f --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PubsubOptions; + +/** + * Command line flags. + */ +public interface NexmarkOptions extends PubsubOptions { + @Description("Which suite to run. Default is to use command line arguments for one job.") + @Default.Enum("DEFAULT") + NexmarkSuite getSuite(); + + void setSuite(NexmarkSuite suite); + + @Description("If true, and using the DataflowPipelineRunner, monitor the jobs as they run.") + @Default.Boolean(false) + boolean getMonitorJobs(); + + void setMonitorJobs(boolean monitorJobs); + + @Description("Where the events come from.") + @Nullable + NexmarkUtils.SourceType getSourceType(); + + void setSourceType(NexmarkUtils.SourceType sourceType); + + @Description("Prefix for input files if using avro input") + @Nullable + String getInputPath(); + + void setInputPath(String inputPath); + + @Description("Where results go.") + @Nullable + NexmarkUtils.SinkType getSinkType(); + + void setSinkType(NexmarkUtils.SinkType sinkType); + + @Description("Which mode to run in when source is PUBSUB.") + @Nullable + NexmarkUtils.PubSubMode getPubSubMode(); + + void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode); + + @Description("Which query to run.") + @Nullable + Integer getQuery(); + + void setQuery(Integer query); + + @Description("Prefix for output files if using text output for results or running Query 10.") + @Nullable + String getOutputPath(); + + void setOutputPath(String outputPath); + + @Description("Base name of pubsub topic to publish to in streaming mode.") + @Nullable + @Default.String("nexmark") + String getPubsubTopic(); + + void setPubsubTopic(String pubsubTopic); + + @Description("Base name of pubsub subscription to read from in streaming mode.") + @Nullable + @Default.String("nexmark") + String getPubsubSubscription(); + + void setPubsubSubscription(String pubsubSubscription); + + @Description("Base name of BigQuery table name if using BigQuery output.") + @Nullable + @Default.String("nexmark") + String getBigQueryTable(); + + void setBigQueryTable(String bigQueryTable); + + @Description("Approximate number of events to generate. " + + "Zero for effectively unlimited in streaming mode.") + @Nullable + Long getNumEvents(); + + void setNumEvents(Long numEvents); + + @Description("Time in seconds to preload the subscription with data, at the initial input rate " + + "of the pipeline.") + @Nullable + Integer getPreloadSeconds(); + + void setPreloadSeconds(Integer preloadSeconds); + + @Description("Number of unbounded sources to create events.") + @Nullable + Integer getNumEventGenerators(); + + void setNumEventGenerators(Integer numEventGenerators); + + @Description("Shape of event rate curve.") + @Nullable + NexmarkUtils.RateShape getRateShape(); + + void setRateShape(NexmarkUtils.RateShape rateShape); + + @Description("Initial overall event rate (in --rateUnit).") + @Nullable + Integer getFirstEventRate(); + + void setFirstEventRate(Integer firstEventRate); + + @Description("Next overall event rate (in --rateUnit).") + @Nullable + Integer getNextEventRate(); + + void setNextEventRate(Integer nextEventRate); + + @Description("Unit for rates.") + @Nullable + NexmarkUtils.RateUnit getRateUnit(); + + void setRateUnit(NexmarkUtils.RateUnit rateUnit); + + @Description("Overall period of rate shape, in seconds.") + @Nullable + Integer getRatePeriodSec(); + + void setRatePeriodSec(Integer ratePeriodSec); + + @Description("If true, relay events in real time in streaming mode.") + @Nullable + Boolean getIsRateLimited(); + + void setIsRateLimited(Boolean isRateLimited); + + @Description("If true, use wallclock time as event time. Otherwise, use a deterministic" + + " time in the past so that multiple runs will see exactly the same event streams" + + " and should thus have exactly the same results.") + @Nullable + Boolean getUseWallclockEventTime(); + + void setUseWallclockEventTime(Boolean useWallclockEventTime); + + @Description("Assert pipeline results match model results.") + @Nullable + boolean getAssertCorrectness(); + + void setAssertCorrectness(boolean assertCorrectness); + + @Description("Log all input events.") + @Nullable + boolean getLogEvents(); + + void setLogEvents(boolean logEvents); + + @Description("Log all query results.") + @Nullable + boolean getLogResults(); + + void setLogResults(boolean logResults); + + @Description("Average size in bytes for a person record.") + @Nullable + Integer getAvgPersonByteSize(); + + void setAvgPersonByteSize(Integer avgPersonByteSize); + + @Description("Average size in bytes for an auction record.") + @Nullable + Integer getAvgAuctionByteSize(); + + void setAvgAuctionByteSize(Integer avgAuctionByteSize); + + @Description("Average size in bytes for a bid record.") + @Nullable + Integer getAvgBidByteSize(); + + void setAvgBidByteSize(Integer avgBidByteSize); + + @Description("Ratio of bids for 'hot' auctions above the background.") + @Nullable + Integer getHotAuctionRatio(); + + void setHotAuctionRatio(Integer hotAuctionRatio); + + @Description("Ratio of auctions for 'hot' sellers above the background.") + @Nullable + Integer getHotSellersRatio(); + + void setHotSellersRatio(Integer hotSellersRatio); + + @Description("Ratio of auctions for 'hot' bidders above the background.") + @Nullable + Integer getHotBiddersRatio(); + + void setHotBiddersRatio(Integer hotBiddersRatio); + + @Description("Window size in seconds.") + @Nullable + Long getWindowSizeSec(); + + void setWindowSizeSec(Long windowSizeSec); + + @Description("Window period in seconds.") + @Nullable + Long getWindowPeriodSec(); + + void setWindowPeriodSec(Long windowPeriodSec); + + @Description("If in streaming mode, the holdback for watermark in seconds.") + @Nullable + Long getWatermarkHoldbackSec(); + + void setWatermarkHoldbackSec(Long watermarkHoldbackSec); + + @Description("Roughly how many auctions should be in flight for each generator.") + @Nullable + Integer getNumInFlightAuctions(); + + void setNumInFlightAuctions(Integer numInFlightAuctions); + + + @Description("Maximum number of people to consider as active for placing auctions or bids.") + @Nullable + Integer getNumActivePeople(); + + void setNumActivePeople(Integer numActivePeople); + + @Description("Filename of perf data to append to.") + @Nullable + String getPerfFilename(); + + void setPerfFilename(String perfFilename); + + @Description("Filename of baseline perf data to read from.") + @Nullable + String getBaselineFilename(); + + void setBaselineFilename(String baselineFilename); + + @Description("Filename of summary perf data to append to.") + @Nullable + String getSummaryFilename(); + + void setSummaryFilename(String summaryFilename); + + @Description("Filename for javascript capturing all perf data and any baselines.") + @Nullable + String getJavascriptFilename(); + + void setJavascriptFilename(String javascriptFilename); + + @Description("If true, don't run the actual query. Instead, calculate the distribution " + + "of number of query results per (event time) minute according to the query model.") + @Nullable + boolean getJustModelResultRate(); + + void setJustModelResultRate(boolean justModelResultRate); + + @Description("Coder strategy to use.") + @Nullable + NexmarkUtils.CoderStrategy getCoderStrategy(); + + void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy); + + @Description("Delay, in milliseconds, for each event. We will peg one core for this " + + "number of milliseconds to simulate CPU-bound computation.") + @Nullable + Long getCpuDelayMs(); + + void setCpuDelayMs(Long cpuDelayMs); + + @Description("Extra data, in bytes, to save to persistent state for each event. " + + "This will force I/O all the way to durable storage to simulate an " + + "I/O-bound computation.") + @Nullable + Long getDiskBusyBytes(); + + void setDiskBusyBytes(Long diskBusyBytes); + + @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction") + @Nullable + Integer getAuctionSkip(); + + void setAuctionSkip(Integer auctionSkip); + + @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).") + @Nullable + Integer getFanout(); + + void setFanout(Integer fanout); + + @Description("Length of occasional delay to impose on events (in seconds).") + @Nullable + Long getOccasionalDelaySec(); + + void setOccasionalDelaySec(Long occasionalDelaySec); + + @Description("Probability that an event will be delayed by delayS.") + @Nullable + Double getProbDelayedEvent(); + + void setProbDelayedEvent(Double probDelayedEvent); + + @Description("Maximum size of each log file (in events). For Query10 only.") + @Nullable + Integer getMaxLogEvents(); + + void setMaxLogEvents(Integer maxLogEvents); + + @Description("How to derive names of resources.") + @Default.Enum("QUERY_AND_SALT") + NexmarkUtils.ResourceNameMode getResourceNameMode(); + + void setResourceNameMode(NexmarkUtils.ResourceNameMode mode); + + @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.") + @Default.Boolean(true) + boolean getManageResources(); + + void setManageResources(boolean manageResources); + + @Description("If true, use pub/sub publish time instead of event time.") + @Nullable + Boolean getUsePubsubPublishTime(); + + void setUsePubsubPublishTime(Boolean usePubsubPublishTime); + + @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. " + + "1000 implies every 1000 events per generator are emitted in pseudo-random order.") + @Nullable + Long getOutOfOrderGroupSize(); + + void setOutOfOrderGroupSize(Long outOfOrderGroupSize); + + @Description("If false, do not add the Monitor and Snoop transforms.") + @Nullable + Boolean getDebug(); + + void setDebug(Boolean value); + + @Description("If set, cancel running pipelines after this long") + @Nullable + Long getRunningTimeMinutes(); + + void setRunningTimeMinutes(Long value); + + @Description("If set and --monitorJobs is true, check that the system watermark is never more " + + "than this far behind real time") + @Nullable + Long getMaxSystemLagSeconds(); + + void setMaxSystemLagSeconds(Long value); + + @Description("If set and --monitorJobs is true, check that the data watermark is never more " + + "than this far behind real time") + @Nullable + Long getMaxDataLagSeconds(); + + void setMaxDataLagSeconds(Long value); + + @Description("Only start validating watermarks after this many seconds") + @Nullable + Long getWatermarkValidationDelaySeconds(); + + void setWatermarkValidationDelaySeconds(Long value); +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java index 37b6213..e7f59c8 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkPerf.java @@ -27,7 +27,7 @@ import javax.annotation.Nullable; /** * Summary of performance for a particular run of a configuration. */ -class NexmarkPerf { +public class NexmarkPerf { /** * A sample of the number of events and number of results (if known) generated at * a particular time. @@ -177,8 +177,6 @@ class NexmarkPerf { /** * Parse a {@link NexmarkPerf} object from JSON {@code string}. - * - * @throws IOException */ public static NexmarkPerf fromString(String string) { try { http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java index 5ef4191..c268a3b 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java @@ -18,7 +18,11 @@ package org.apache.beam.integration.nexmark; import javax.annotation.Nullable; - +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.Event; +import org.apache.beam.integration.nexmark.model.KnownSize; +import org.apache.beam.integration.nexmark.model.Person; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; @@ -29,7 +33,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; - import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java index f265e0d..b2b1826 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java @@ -17,11 +17,6 @@ */ package org.apache.beam.integration.nexmark; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.hasItems; -import org.hamcrest.collection.IsIterableContainingInAnyOrder; - import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -30,10 +25,11 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.TimestampedValue; -import org.hamcrest.core.IsCollectionContaining; + import org.hamcrest.core.IsEqual; import org.joda.time.Duration; import org.joda.time.Instant; @@ -107,15 +103,18 @@ public abstract class NexmarkQueryModel implements Serializable { /** Return assertion to use on results of pipeline for this query. */ public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() { final Collection<String> expectedStrings = toCollection(simulator().results()); - final String[] expectedStringsArray = expectedStrings.toArray(new String[expectedStrings.size()]); + final String[] expectedStringsArray = + expectedStrings.toArray(new String[expectedStrings.size()]); return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() { @Override public Void apply(Iterable<TimestampedValue<KnownSize>> actual) { - Collection<String> actualStrings = toCollection(relevantResults(actual).iterator()); - Assert.assertThat("wrong pipeline output", actualStrings, IsEqual.equalTo(expectedStrings)); + Collection<String> actualStrings = toCollection(relevantResults(actual).iterator()); + Assert.assertThat("wrong pipeline output", actualStrings, + IsEqual.equalTo(expectedStrings)); //compare without order -// Assert.assertThat("wrong pipeline output", actualStrings, IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray)); +// Assert.assertThat("wrong pipeline output", actualStrings, +// IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray)); return null; } };