Fix compile after Coders and Pubsub refactor
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8b96949b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8b96949b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8b96949b Branch: refs/heads/master Commit: 8b96949b934be1df7102aeb24ef4b23d9dd28812 Parents: b438fa7 Author: Etienne Chauchot <echauc...@gmail.com> Authored: Fri Apr 28 10:29:38 2017 +0200 Committer: Ismaël MejÃa <ieme...@gmail.com> Committed: Wed Aug 23 19:07:28 2017 +0200 ---------------------------------------------------------------------- .../integration/nexmark/NexmarkOptions.java | 3 +-- .../beam/integration/nexmark/WinningBids.java | 23 +++++++++++--------- .../integration/nexmark/io/PubsubClient.java | 2 +- .../integration/nexmark/io/PubsubHelper.java | 2 +- .../nexmark/io/PubsubJsonClient.java | 2 +- .../nexmark/io/PubsubTestClient.java | 2 +- .../beam/integration/nexmark/model/Auction.java | 6 ++--- .../integration/nexmark/model/AuctionBid.java | 6 ++--- .../integration/nexmark/model/AuctionCount.java | 6 ++--- .../integration/nexmark/model/AuctionPrice.java | 6 ++--- .../beam/integration/nexmark/model/Bid.java | 8 +++---- .../nexmark/model/BidsPerSession.java | 7 +++--- .../nexmark/model/CategoryPrice.java | 7 +++--- .../beam/integration/nexmark/model/Done.java | 7 +++--- .../beam/integration/nexmark/model/Event.java | 6 ++--- .../nexmark/model/IdNameReserve.java | 7 +++--- .../nexmark/model/NameCityStateId.java | 7 +++--- .../beam/integration/nexmark/model/Person.java | 7 +++--- .../integration/nexmark/model/SellerPrice.java | 7 +++--- .../integration/nexmark/sources/Generator.java | 6 ++--- 20 files changed, 57 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java index 5d093ae..e1c1af2 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java @@ -18,10 +18,9 @@ package org.apache.beam.integration.nexmark; import javax.annotation.Nullable; - +import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PubsubOptions; /** * Command line flags. http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java index f2566b8..3815b9d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java @@ -35,9 +35,9 @@ import org.apache.beam.integration.nexmark.model.AuctionBid; import org.apache.beam.integration.nexmark.model.Bid; import org.apache.beam.integration.nexmark.model.Event; import org.apache.beam.integration.nexmark.sources.GeneratorConfig; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.metrics.Counter; @@ -145,7 +145,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct /** * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long. */ - private static class AuctionOrBidWindowCoder extends AtomicCoder<AuctionOrBidWindow> { + private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> { private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder(); private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder(); private static final Coder<Long> ID_CODER = VarLongCoder.of(); @@ -157,22 +157,25 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct } @Override - public void encode(AuctionOrBidWindow window, OutputStream outStream, Context context) + public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context) throws IOException, CoderException { - SUPER_CODER.encode(window, outStream, Context.NESTED); - ID_CODER.encode(window.auction, outStream, Context.NESTED); - INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Context.NESTED); + SUPER_CODER.encode(window, outStream, Coder.Context.NESTED); + ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED); + INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED); } @Override - public AuctionOrBidWindow decode(InputStream inStream, Context context) + public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context) throws IOException, CoderException { - IntervalWindow superWindow = SUPER_CODER.decode(inStream, Context.NESTED); - long auction = ID_CODER.decode(inStream, Context.NESTED); - boolean isAuctionWindow = INT_CODER.decode(inStream, Context.NESTED) == 0 ? false : true; + IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED); + long auction = ID_CODER.decode(inStream, Coder.Context.NESTED); + boolean isAuctionWindow = + INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true; return new AuctionOrBidWindow( superWindow.start(), superWindow.end(), auction, isAuctionWindow); } + + @Override public void verifyDeterministic() throws NonDeterministicException {} } /** Assign events to auction windows and merges them intelligently. */ http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java index 687aa35..931fe6e 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java @@ -32,7 +32,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; -import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; /** * An (abstract) helper class for talking to Pubsub via an underlying transport. http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java index 15401b7..bcc5b1c 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java @@ -23,7 +23,7 @@ import java.util.Collection; import java.util.List; import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; /** * Helper for working with pubsub. http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java index b778a09..afddbd8 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java @@ -47,7 +47,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import javax.annotation.Nullable; -import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.util.Transport; http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java index 125a8d6..69ba2b0 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java @@ -33,7 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; -import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions; /** * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java index ac30568..4b1a848 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java @@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; @@ -39,7 +37,7 @@ public class Auction implements KnownSize, Serializable { private static final Coder<Long> LONG_CODER = VarLongCoder.of(); private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - public static final Coder<Auction> CODER = new AtomicCoder<Auction>() { + public static final Coder<Auction> CODER = new CustomCoder<Auction>() { @Override public void encode(Auction value, OutputStream outStream, Coder.Context context) http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java index c014257..7f6b7c9 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java @@ -19,23 +19,21 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - import org.apache.beam.integration.nexmark.NexmarkUtils; import org.apache.beam.integration.nexmark.WinningBids; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; /** * Result of {@link WinningBids} transform. */ public class AuctionBid implements KnownSize, Serializable { - public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() { + public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() { @Override public void encode(AuctionBid value, OutputStream outStream, Coder.Context context) http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java index aa16629..e6d3450 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java @@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarLongCoder; /** @@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder; public class AuctionCount implements KnownSize, Serializable { private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() { + public static final Coder<AuctionCount> CODER = new CustomCoder<AuctionCount>() { @Override public void encode(AuctionCount value, OutputStream outStream, Coder.Context context) http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java index f365cc8..cb971e2 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java @@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarLongCoder; /** @@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder; public class AuctionPrice implements KnownSize, Serializable { private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() { + public static final Coder<AuctionPrice> CODER = new CustomCoder<AuctionPrice>() { @Override public void encode(AuctionPrice value, OutputStream outStream, Coder.Context context) http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java index 59a33c1..faeb928 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java @@ -19,17 +19,15 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.util.Comparator; - import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; @@ -40,7 +38,7 @@ public class Bid implements KnownSize, Serializable { private static final Coder<Long> LONG_CODER = VarLongCoder.of(); private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - public static final Coder<Bid> CODER = new AtomicCoder<Bid>() { + public static final Coder<Bid> CODER = new CustomCoder<Bid>() { @Override public void encode(Bid value, OutputStream outStream, Coder.Context context) @@ -63,6 +61,8 @@ public class Bid implements KnownSize, Serializable { String extra = STRING_CODER.decode(inStream, Context.NESTED); return new Bid(auction, bidder, price, dateTime, extra); } + + @Override public void verifyDeterministic() throws NonDeterministicException {} }; /** http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java index 7c4dfae..26b6a41 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java @@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarLongCoder; /** @@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder; public class BidsPerSession implements KnownSize, Serializable { private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() { + public static final Coder<BidsPerSession> CODER = new CustomCoder<BidsPerSession>() { @Override public void encode(BidsPerSession value, OutputStream outStream, Coder.Context context) @@ -54,6 +52,7 @@ public class BidsPerSession implements KnownSize, Serializable { long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED); return new BidsPerSession(personId, bidsPerSession); } + @Override public void verifyDeterministic() throws NonDeterministicException {} }; @JsonProperty http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java index 6512cc1..ccb2bc7 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java @@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; @@ -39,7 +37,7 @@ public class CategoryPrice implements KnownSize, Serializable { private static final Coder<Long> LONG_CODER = VarLongCoder.of(); private static final Coder<Integer> INT_CODER = VarIntCoder.of(); - public static final Coder<CategoryPrice> CODER = new AtomicCoder<CategoryPrice>() { + public static final Coder<CategoryPrice> CODER = new CustomCoder<CategoryPrice>() { @Override public void encode(CategoryPrice value, OutputStream outStream, Coder.Context context) @@ -58,6 +56,7 @@ public class CategoryPrice implements KnownSize, Serializable { boolean isLast = INT_CODER.decode(inStream, context) != 0; return new CategoryPrice(category, price, isLast); } + @Override public void verifyDeterministic() throws NonDeterministicException {} }; @JsonProperty http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java index 6009463..42999cd 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java @@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; /** @@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; public class Done implements KnownSize, Serializable { private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - public static final Coder<Done> CODER = new AtomicCoder<Done>() { + public static final Coder<Done> CODER = new CustomCoder<Done>() { @Override public void encode(Done value, OutputStream outStream, Coder.Context context) @@ -52,6 +50,7 @@ public class Done implements KnownSize, Serializable { String message = STRING_CODER.decode(inStream, Context.NESTED); return new Done(message); } + @Override public void verifyDeterministic() throws NonDeterministicException {} }; @JsonProperty http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java index 8a278bf..e2130c9 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java @@ -22,10 +22,9 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import javax.annotation.Nullable; - -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarIntCoder; /** @@ -35,7 +34,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; public class Event implements KnownSize, Serializable { private static final Coder<Integer> INT_CODER = VarIntCoder.of(); - public static final Coder<Event> CODER = new AtomicCoder<Event>() { + public static final Coder<Event> CODER = new CustomCoder<Event>() { @Override public void encode(Event value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { @@ -71,6 +70,7 @@ public class Event implements KnownSize, Serializable { throw new RuntimeException("invalid event encoding"); } } + @Override public void verifyDeterministic() throws NonDeterministicException {} }; @Nullable http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java index 5d22651..cf1e571 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java @@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; @@ -39,7 +37,7 @@ public class IdNameReserve implements KnownSize, Serializable { private static final Coder<Long> LONG_CODER = VarLongCoder.of(); private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - public static final Coder<IdNameReserve> CODER = new AtomicCoder<IdNameReserve>() { + public static final Coder<IdNameReserve> CODER = new CustomCoder<IdNameReserve>() { @Override public void encode(IdNameReserve value, OutputStream outStream, Coder.Context context) @@ -58,6 +56,7 @@ public class IdNameReserve implements KnownSize, Serializable { long reserve = LONG_CODER.decode(inStream, Context.NESTED); return new IdNameReserve(id, name, reserve); } + @Override public void verifyDeterministic() throws NonDeterministicException {} }; @JsonProperty http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java index ac22879..86d1738 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java @@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; @@ -39,7 +37,7 @@ public class NameCityStateId implements KnownSize, Serializable { private static final Coder<Long> LONG_CODER = VarLongCoder.of(); private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - public static final Coder<NameCityStateId> CODER = new AtomicCoder<NameCityStateId>() { + public static final Coder<NameCityStateId> CODER = new CustomCoder<NameCityStateId>() { @Override public void encode(NameCityStateId value, OutputStream outStream, Coder.Context context) @@ -60,6 +58,7 @@ public class NameCityStateId implements KnownSize, Serializable { long id = LONG_CODER.decode(inStream, Context.NESTED); return new NameCityStateId(name, city, state, id); } + @Override public void verifyDeterministic() throws NonDeterministicException {} }; @JsonProperty http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java index 85c7183..906df94 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java @@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; @@ -38,7 +36,7 @@ import org.apache.beam.sdk.coders.VarLongCoder; public class Person implements KnownSize, Serializable { private static final Coder<Long> LONG_CODER = VarLongCoder.of(); private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); - public static final Coder<Person> CODER = new AtomicCoder<Person>() { + public static final Coder<Person> CODER = new CustomCoder<Person>() { @Override public void encode(Person value, OutputStream outStream, Coder.Context context) @@ -67,6 +65,7 @@ public class Person implements KnownSize, Serializable { String extra = STRING_CODER.decode(inStream, Context.NESTED); return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra); } + @Override public void verifyDeterministic() throws NonDeterministicException {} }; /** Id of person. */ http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java index b7c2b14..68f2697 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java @@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; - import org.apache.beam.integration.nexmark.NexmarkUtils; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarLongCoder; /** @@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder; public class SellerPrice implements KnownSize, Serializable { private static final Coder<Long> LONG_CODER = VarLongCoder.of(); - public static final Coder<SellerPrice> CODER = new AtomicCoder<SellerPrice>() { + public static final Coder<SellerPrice> CODER = new CustomCoder<SellerPrice>() { @Override public void encode(SellerPrice value, OutputStream outStream, Coder.Context context) @@ -54,6 +52,7 @@ public class SellerPrice implements KnownSize, Serializable { long price = LONG_CODER.decode(inStream, Context.NESTED); return new SellerPrice(seller, price); } + @Override public void verifyDeterministic() throws NonDeterministicException {} }; @JsonProperty http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java index cffc7a5..012d4e6 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java @@ -27,14 +27,13 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Random; - import org.apache.beam.integration.nexmark.model.Auction; import org.apache.beam.integration.nexmark.model.Bid; import org.apache.beam.integration.nexmark.model.Event; import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.values.TimestampedValue; @@ -102,7 +101,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl /** Coder for this class. */ public static final Coder<Checkpoint> CODER_INSTANCE = - new AtomicCoder<Checkpoint>() { + new CustomCoder<Checkpoint>() { @Override public void encode( Checkpoint value, @@ -121,6 +120,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED); return new Checkpoint(numEvents, wallclockBaseTime); } + @Override public void verifyDeterministic() throws NonDeterministicException {} }; private long numEvents;