http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java index 12b16f1..71364ba 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.Event;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java index e4b72d2..6b98e2a 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java @@ -26,9 +26,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import org.apache.beam.integration.nexmark.AbstractSimulator; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQueryModel; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.Event; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java index 61991c8..9c0fe6d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java @@ -19,9 +19,7 @@ package org.apache.beam.integration.nexmark.queries; import org.apache.beam.integration.nexmark.Monitor; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.WinningBids; import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.AuctionBid; import org.apache.beam.integration.nexmark.model.Bid; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java index 9405ac8..634a58e 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java @@ -24,11 +24,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.beam.integration.nexmark.AbstractSimulator; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQueryModel; import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.WinningBidsSimulator; import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.AuctionBid; import org.apache.beam.integration.nexmark.model.Bid; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java index 34b7b50..18ce578 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.AuctionCount; import org.apache.beam.integration.nexmark.model.Bid; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java index 6bf65dc..24d9a00 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java @@ -24,9 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; -import org.apache.beam.integration.nexmark.AbstractSimulator; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQueryModel; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.AuctionCount; import org.apache.beam.integration.nexmark.model.Bid; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java index 2a5ab702..65789ab 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java @@ -22,9 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.WinningBids; import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.AuctionBid; import org.apache.beam.integration.nexmark.model.Bid; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java index 4325337..0691714 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java @@ -22,11 +22,8 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; -import org.apache.beam.integration.nexmark.AbstractSimulator; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQueryModel; import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.WinningBidsSimulator; import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.AuctionBid; import org.apache.beam.integration.nexmark.model.Bid; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java index f3d1ba4..2a94ca9 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java @@ -18,7 +18,6 @@ package org.apache.beam.integration.nexmark.queries; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Bid; import org.apache.beam.integration.nexmark.model.Event; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java index 0a80e59..5c039f9 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java @@ -23,9 +23,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; -import org.apache.beam.integration.nexmark.AbstractSimulator; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQueryModel; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Bid; import org.apache.beam.integration.nexmark.model.Event; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java index e7daccd..603841b 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java @@ -18,7 +18,6 @@ package org.apache.beam.integration.nexmark.queries; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.Event; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java index 1161994..8c76bc6 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java @@ -24,9 +24,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import org.apache.beam.integration.nexmark.AbstractSimulator; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQueryModel; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.Event; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java index aed827b..6dd189d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java @@ -18,9 +18,7 @@ package org.apache.beam.integration.nexmark.queries; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.integration.nexmark.WinningBids; import org.apache.beam.integration.nexmark.model.AuctionBid; import org.apache.beam.integration.nexmark.model.Event; import org.apache.beam.integration.nexmark.model.KnownSize; http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java index b88d60a..d117e2d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java @@ -21,10 +21,7 @@ import java.io.Serializable; import java.util.Collection; import java.util.Iterator; -import org.apache.beam.integration.nexmark.AbstractSimulator; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQueryModel; -import org.apache.beam.integration.nexmark.WinningBidsSimulator; import org.apache.beam.sdk.values.TimestampedValue; /** http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java new file mode 100644 index 0000000..11a4d38 --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.queries; + +import static com.google.common.base.Preconditions.checkState; + +import com.fasterxml.jackson.annotation.JsonCreator; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.beam.integration.nexmark.NexmarkConfiguration; +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.AuctionBid; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.Event; +import org.apache.beam.integration.nexmark.sources.GeneratorConfig; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; + +/** + * A transform to find the winning bid for each closed auction. In pseudo CQL syntax: + * + * <pre>{@code + * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime) + * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED] + * WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME + * GROUP BY A.id + * }</pre> + * + * <p>We will also check that the winning bid is above the auction reserve. Note that + * we ignore the auction opening bid value since it has no impact on which bid eventually wins, + * if any. + * + * <p>Our implementation will use a custom windowing function in order to bring bids and + * auctions together without requiring global state. + */ +public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> { + /** Windows for open auctions and bids. */ + private static class AuctionOrBidWindow extends IntervalWindow implements Serializable { + /** Id of auction this window is for. */ + public final long auction; + + /** + * True if this window represents an actual auction, and thus has a start/end + * time matching that of the auction. False if this window represents a bid, and + * thus has an unbounded start/end time. + */ + public final boolean isAuctionWindow; + + /** For avro only. */ + private AuctionOrBidWindow() { + super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE); + auction = 0; + isAuctionWindow = false; + } + + private AuctionOrBidWindow( + Instant start, Instant end, long auctionId, boolean isAuctionWindow) { + super(start, end); + this.auction = auctionId; + this.isAuctionWindow = isAuctionWindow; + } + + /** Return an auction window for {@code auction}. */ + public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) { + AuctionOrBidWindow result = + new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true); + return result; + } + + /** + * Return a bid window for {@code bid}. It should later be merged into + * the corresponding auction window. However, it is possible this bid is for an already + * expired auction, or for an auction which the system has not yet seen. So we + * give the bid a bit of wiggle room in its interval. + */ + public static AuctionOrBidWindow forBid( + long expectedAuctionDurationMs, Instant timestamp, Bid bid) { + // At this point we don't know which auctions are still valid, and the bid may + // be for an auction which won't start until some unknown time in the future + // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid). + // A real system would atomically reconcile bids and auctions by a separate mechanism. + // If we give bids an unbounded window it is possible a bid for an auction which + // has already expired would cause the system watermark to stall, since that window + // would never be retired. + // Instead, we will just give the bid a finite window which expires at + // the upper bound of auctions assuming the auction starts at the same time as the bid, + // and assuming the system is running at its lowest event rate (as per interEventDelayUs). + AuctionOrBidWindow result = new AuctionOrBidWindow( + timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false); + return result; + } + + /** Is this an auction window? */ + public boolean isAuctionWindow() { + return isAuctionWindow; + } + + @Override + public String toString() { + return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}", + start(), end(), auction, isAuctionWindow); + } + } + + /** + * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long. + */ + private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> { + private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder(); + private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder(); + private static final Coder<Long> ID_CODER = VarLongCoder.of(); + private static final Coder<Integer> INT_CODER = VarIntCoder.of(); + + @JsonCreator + public static AuctionOrBidWindowCoder of() { + return INSTANCE; + } + + @Override + public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context) + throws IOException, CoderException { + SUPER_CODER.encode(window, outStream, Coder.Context.NESTED); + ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED); + INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED); + } + + @Override + public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context) + throws IOException, CoderException { + IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED); + long auction = ID_CODER.decode(inStream, Coder.Context.NESTED); + boolean isAuctionWindow = + INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true; + return new AuctionOrBidWindow( + superWindow.start(), superWindow.end(), auction, isAuctionWindow); + } + + @Override public void verifyDeterministic() throws NonDeterministicException {} + } + + /** Assign events to auction windows and merges them intelligently. */ + private static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> { + /** Expected duration of auctions in ms. */ + private final long expectedAuctionDurationMs; + + public AuctionOrBidWindowFn(long expectedAuctionDurationMs) { + this.expectedAuctionDurationMs = expectedAuctionDurationMs; + } + + @Override + public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) { + Event event = c.element(); + if (event.newAuction != null) { + // Assign auctions to an auction window which expires at the auction's close. + return Arrays.asList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction)); + } else if (event.bid != null) { + // Assign bids to a temporary bid window which will later be merged into the appropriate + // auction window. + return Arrays.asList( + AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid)); + } else { + // Don't assign people to any window. They will thus be dropped. + return Arrays.asList(); + } + } + + @Override + public void mergeWindows(MergeContext c) throws Exception { + // Split and index the auction and bid windows by auction id. + Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>(); + Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new TreeMap<>(); + for (AuctionOrBidWindow window : c.windows()) { + if (window.isAuctionWindow()) { + idToTrueAuctionWindow.put(window.auction, window); + } else { + List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(window.auction); + if (bidWindows == null) { + bidWindows = new ArrayList<>(); + idToBidAuctionWindows.put(window.auction, bidWindows); + } + bidWindows.add(window); + } + } + + // Merge all 'bid' windows into their corresponding 'auction' window, provided the + // auction has not expired. + for (long auction : idToTrueAuctionWindow.keySet()) { + AuctionOrBidWindow auctionWindow = idToTrueAuctionWindow.get(auction); + List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction); + if (bidWindows != null) { + List<AuctionOrBidWindow> toBeMerged = new ArrayList<>(); + for (AuctionOrBidWindow bidWindow : bidWindows) { + if (bidWindow.start().isBefore(auctionWindow.end())) { + toBeMerged.add(bidWindow); + } + // else: This bid window will remain until its expire time, at which point it + // will expire without ever contributing to an output. + } + if (!toBeMerged.isEmpty()) { + toBeMerged.add(auctionWindow); + c.merge(toBeMerged, auctionWindow); + } + } + } + } + + @Override + public boolean isCompatible(WindowFn<?, ?> other) { + return other instanceof AuctionOrBidWindowFn; + } + + @Override + public Coder<AuctionOrBidWindow> windowCoder() { + return AuctionOrBidWindowCoder.of(); + } + + @Override + public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() { + throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs"); + } + + /** + * Below we will GBK auctions and bids on their auction ids. Then we will reduce those + * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at + * least one valid bid. We would like those output pairs to have a timestamp of the auction's + * expiry (since that's the earliest we know for sure we have the correct winner). We would + * also like to make that winning results are available to following stages at the auction's + * expiry. + * + * <p>Each result of the GBK will have a timestamp of the min of the result of this object's + * assignOutputTime over all records which end up in one of its iterables. Thus we get the + * desired behavior if we ignore each record's timestamp and always return the auction window's + * 'maxTimestamp', which will correspond to the auction's expiry. + * + * <p>In contrast, if this object's assignOutputTime were to return 'inputTimestamp' + * (the usual implementation), then each GBK record will take as its timestamp the minimum of + * the timestamps of all bids and auctions within it, which will always be the auction's + * timestamp. An auction which expires well into the future would thus hold up the watermark + * of the GBK results until that auction expired. That in turn would hold up all winning pairs. + */ + @Override + public Instant getOutputTime( + Instant inputTimestamp, AuctionOrBidWindow window) { + return window.maxTimestamp(); + } + } + + private final AuctionOrBidWindowFn auctionOrBidWindowFn; + + public WinningBids(String name, NexmarkConfiguration configuration) { + super(name); + // What's the expected auction time (when the system is running at the lowest event rate). + long[] interEventDelayUs = configuration.rateShape.interEventDelayUs( + configuration.firstEventRate, configuration.nextEventRate, + configuration.rateUnit, configuration.numEventGenerators); + long longestDelayUs = 0; + for (int i = 0; i < interEventDelayUs.length; i++) { + longestDelayUs = Math.max(longestDelayUs, interEventDelayUs[i]); + } + // Adjust for proportion of auction events amongst all events. + longestDelayUs = + (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR) + / GeneratorConfig.AUCTION_PROPORTION; + // Adjust for number of in-flight auctions. + longestDelayUs = longestDelayUs * configuration.numInFlightAuctions; + long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000; + NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs); + auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs); + } + + @Override + public PCollection<AuctionBid> expand(PCollection<Event> events) { + // Window auctions and bids into custom auction windows. New people events will be discarded. + // This will allow us to bring bids and auctions together irrespective of how long + // each auction is open for. + events = events.apply("Window", Window.into(auctionOrBidWindowFn)); + + // Key auctions by their id. + PCollection<KV<Long, Auction>> auctionsById = + events.apply(NexmarkQuery.JUST_NEW_AUCTIONS) + .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID); + + // Key bids by their auction id. + PCollection<KV<Long, Bid>> bidsByAuctionId = + events.apply(NexmarkQuery.JUST_BIDS).apply("BidByAuction", NexmarkQuery.BID_BY_AUCTION); + + // Find the highest price valid bid for each closed auction. + return + // Join auctions and bids. + KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById) + .and(NexmarkQuery.BID_TAG, bidsByAuctionId) + .apply(CoGroupByKey.<Long>create()) + // Filter and select. + .apply(name + ".Join", + ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() { + private final Counter noAuctionCounter = Metrics.counter(name, "noAuction"); + private final Counter underReserveCounter = Metrics.counter(name, "underReserve"); + private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids"); + + @ProcessElement + public void processElement(ProcessContext c) { + Auction auction = + c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null); + if (auction == null) { + // We have bids without a matching auction. Give up. + noAuctionCounter.inc(); + return; + } + // Find the current winning bid for auction. + // The earliest bid with the maximum price above the reserve wins. + Bid bestBid = null; + for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) { + // Bids too late for their auction will have been + // filtered out by the window merge function. + checkState(bid.dateTime < auction.expires); + if (bid.price < auction.reserve) { + // Bid price is below auction reserve. + underReserveCounter.inc(); + continue; + } + + if (bestBid == null + || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) { + bestBid = bid; + } + } + if (bestBid == null) { + // We don't have any valid bids for auction. + noValidBidsCounter.inc(); + return; + } + c.output(new AuctionBid(auction, bestBid)); + } + } + )); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java new file mode 100644 index 0000000..7d74f8f --- /dev/null +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.integration.nexmark.queries; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import javax.annotation.Nullable; + +import org.apache.beam.integration.nexmark.NexmarkConfiguration; +import org.apache.beam.integration.nexmark.NexmarkUtils; +import org.apache.beam.integration.nexmark.model.Auction; +import org.apache.beam.integration.nexmark.model.AuctionBid; +import org.apache.beam.integration.nexmark.model.Bid; +import org.apache.beam.integration.nexmark.model.Event; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Instant; + +/** + * A simulator of the {@code WinningBids} query. + */ +public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> { + /** Auctions currently still open, indexed by auction id. */ + private final Map<Long, Auction> openAuctions; + + /** The ids of auctions known to be closed. */ + private final Set<Long> closedAuctions; + + /** Current best valid bids for open auctions, indexed by auction id. */ + private final Map<Long, Bid> bestBids; + + /** Bids for auctions we havn't seen yet. */ + private final List<Bid> bidsWithoutAuctions; + + /** + * Timestamp of last new auction or bid event (ms since epoch). + */ + private long lastTimestamp; + + public WinningBidsSimulator(NexmarkConfiguration configuration) { + super(NexmarkUtils.standardEventIterator(configuration)); + openAuctions = new TreeMap<>(); + closedAuctions = new TreeSet<>(); + bestBids = new TreeMap<>(); + bidsWithoutAuctions = new ArrayList<>(); + lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + } + + /** + * Try to account for {@code bid} in state. Return true if bid has now been + * accounted for by {@code bestBids}. + */ + private boolean captureBestBid(Bid bid, boolean shouldLog) { + if (closedAuctions.contains(bid.auction)) { + // Ignore bids for known, closed auctions. + if (shouldLog) { + NexmarkUtils.info("closed auction: %s", bid); + } + return true; + } + Auction auction = openAuctions.get(bid.auction); + if (auction == null) { + // We don't have an auction for this bid yet, so can't determine if it is + // winning or not. + if (shouldLog) { + NexmarkUtils.info("pending auction: %s", bid); + } + return false; + } + if (bid.price < auction.reserve) { + // Bid price is too low. + if (shouldLog) { + NexmarkUtils.info("below reserve: %s", bid); + } + return true; + } + Bid existingBid = bestBids.get(bid.auction); + if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) { + // We've found a (new) best bid for a known auction. + bestBids.put(bid.auction, bid); + if (shouldLog) { + NexmarkUtils.info("new winning bid: %s", bid); + } + } else { + if (shouldLog) { + NexmarkUtils.info("ignoring low bid: %s", bid); + } + } + return true; + } + + /** + * Try to match bids without auctions to auctions. + */ + private void flushBidsWithoutAuctions() { + Iterator<Bid> itr = bidsWithoutAuctions.iterator(); + while (itr.hasNext()) { + Bid bid = itr.next(); + if (captureBestBid(bid, false)) { + NexmarkUtils.info("bid now accounted for: %s", bid); + itr.remove(); + } + } + } + + /** + * Return the next winning bid for an expired auction relative to {@code timestamp}. + * Return null if no more winning bids, in which case all expired auctions will + * have been removed from our state. Retire auctions in order of expire time. + */ + @Nullable + private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) { + Map<Long, List<Long>> toBeRetired = new TreeMap<>(); + for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) { + if (entry.getValue().expires <= timestamp) { + List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires); + if (idsAtTime == null) { + idsAtTime = new ArrayList<>(); + toBeRetired.put(entry.getValue().expires, idsAtTime); + } + idsAtTime.add(entry.getKey()); + } + } + for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) { + for (long id : entry.getValue()) { + Auction auction = openAuctions.get(id); + NexmarkUtils.info("retiring auction: %s", auction); + openAuctions.remove(id); + Bid bestBid = bestBids.get(id); + if (bestBid != null) { + TimestampedValue<AuctionBid> result = + TimestampedValue.of(new AuctionBid(auction, bestBid), new Instant(auction.expires)); + NexmarkUtils.info("winning: %s", result); + return result; + } + } + } + return null; + } + + @Override + protected void run() { + if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) { + // We may have finally seen the auction a bid was intended for. + flushBidsWithoutAuctions(); + TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp); + if (result != null) { + addResult(result); + return; + } + } + + TimestampedValue<Event> timestampedEvent = nextInput(); + if (timestampedEvent == null) { + // No more events. Flush any still open auctions. + TimestampedValue<AuctionBid> result = + nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + if (result == null) { + // We are done. + allDone(); + return; + } + addResult(result); + //TODO test fails because offset of some hundreds of ms beween expect and actual + return; + } + + Event event = timestampedEvent.getValue(); + if (event.newPerson != null) { + // Ignore new person events. + return; + } + + lastTimestamp = timestampedEvent.getTimestamp().getMillis(); + if (event.newAuction != null) { + // Add this new open auction to our state. + openAuctions.put(event.newAuction.id, event.newAuction); + } else { + if (!captureBestBid(event.bid, true)) { + // We don't know what to do with this bid yet. + NexmarkUtils.info("bid not yet accounted for: %s", event.bid); + bidsWithoutAuctions.add(event.bid); + } + } + // Keep looking for winning bids. + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java index 284aa7e..b005d65 100644 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java +++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java @@ -18,8 +18,6 @@ package org.apache.beam.integration.nexmark.queries; import org.apache.beam.integration.nexmark.NexmarkConfiguration; -import org.apache.beam.integration.nexmark.NexmarkQuery; -import org.apache.beam.integration.nexmark.NexmarkQueryModel; import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.sdk.PipelineResult;