Migrate to Beam 2.1.0-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d5c3d00 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d5c3d00 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d5c3d00 Branch: refs/heads/master Commit: 3d5c3d009b441a8085189f9d4ed1926a4042f816 Parents: 69953a0 Author: Etienne Chauchot <echauc...@gmail.com> Authored: Tue May 9 15:25:54 2017 +0200 Committer: Ismaël MejÃa <ieme...@gmail.com> Committed: Wed Aug 23 19:07:28 2017 +0200 ---------------------------------------------------------------------- integration/java/nexmark/pom.xml | 2 +- .../integration/nexmark/NexmarkLauncher.java | 4 +- .../beam/integration/nexmark/NexmarkUtils.java | 43 +++++++++---------- .../beam/integration/nexmark/model/Auction.java | 45 ++++++++++---------- .../integration/nexmark/model/AuctionBid.java | 13 +++--- .../integration/nexmark/model/AuctionCount.java | 14 +++--- .../integration/nexmark/model/AuctionPrice.java | 13 +++--- .../beam/integration/nexmark/model/Bid.java | 25 ++++++----- .../nexmark/model/BidsPerSession.java | 13 +++--- .../nexmark/model/CategoryPrice.java | 18 ++++---- .../beam/integration/nexmark/model/Done.java | 10 ++--- .../beam/integration/nexmark/model/Event.java | 24 +++++------ .../nexmark/model/IdNameReserve.java | 17 ++++---- .../nexmark/model/NameCityStateId.java | 22 +++++----- .../beam/integration/nexmark/model/Person.java | 38 ++++++++--------- .../integration/nexmark/model/SellerPrice.java | 13 +++--- .../nexmark/queries/WinningBids.java | 16 +++---- .../integration/nexmark/sources/Generator.java | 19 +++------ integration/java/pom.xml | 2 +- integration/pom.xml | 2 +- 20 files changed, 163 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml index 35fe0f3..86b88bd 100644 --- a/integration/java/nexmark/pom.xml +++ b/integration/java/nexmark/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-integration-java-parent</artifactId> - <version>0.7.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java index ea4ff58..db53191 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java @@ -739,7 +739,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription); PubsubIO.Read<PubsubMessage> io = - PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(shortSubscription) + PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription) .withIdAttribute(NexmarkUtils.PUBSUB_ID); if (!configuration.usePubsubPublishTime) { io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); @@ -784,7 +784,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> { NexmarkUtils.console("Writing events to Pubsub %s", shortTopic); PubsubIO.Write<PubsubMessage> io = - PubsubIO.writePubsubMessages().to(shortTopic) + PubsubIO.writeMessages().to(shortTopic) .withIdAttribute(NexmarkUtils.PUBSUB_ID); if (!configuration.usePubsubPublishTime) { io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP); http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java index ea851af..7707429 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java @@ -351,25 +351,25 @@ public class NexmarkUtils { CoderRegistry registry = p.getCoderRegistry(); switch (coderStrategy) { case HAND: - registry.registerCoder(Auction.class, Auction.CODER); - registry.registerCoder(AuctionBid.class, AuctionBid.CODER); - registry.registerCoder(AuctionCount.class, AuctionCount.CODER); - registry.registerCoder(AuctionPrice.class, AuctionPrice.CODER); - registry.registerCoder(Bid.class, Bid.CODER); - registry.registerCoder(CategoryPrice.class, CategoryPrice.CODER); - registry.registerCoder(Event.class, Event.CODER); - registry.registerCoder(IdNameReserve.class, IdNameReserve.CODER); - registry.registerCoder(NameCityStateId.class, NameCityStateId.CODER); - registry.registerCoder(Person.class, Person.CODER); - registry.registerCoder(SellerPrice.class, SellerPrice.CODER); - registry.registerCoder(Done.class, Done.CODER); - registry.registerCoder(BidsPerSession.class, BidsPerSession.CODER); + registry.registerCoderForClass(Auction.class, Auction.CODER); + registry.registerCoderForClass(AuctionBid.class, AuctionBid.CODER); + registry.registerCoderForClass(AuctionCount.class, AuctionCount.CODER); + registry.registerCoderForClass(AuctionPrice.class, AuctionPrice.CODER); + registry.registerCoderForClass(Bid.class, Bid.CODER); + registry.registerCoderForClass(CategoryPrice.class, CategoryPrice.CODER); + registry.registerCoderForClass(Event.class, Event.CODER); + registry.registerCoderForClass(IdNameReserve.class, IdNameReserve.CODER); + registry.registerCoderForClass(NameCityStateId.class, NameCityStateId.CODER); + registry.registerCoderForClass(Person.class, Person.CODER); + registry.registerCoderForClass(SellerPrice.class, SellerPrice.CODER); + registry.registerCoderForClass(Done.class, Done.CODER); + registry.registerCoderForClass(BidsPerSession.class, BidsPerSession.CODER); break; case AVRO: - registry.setFallbackCoderProvider(AvroCoder.PROVIDER); + registry.registerCoderProvider(AvroCoder.getCoderProvider()); break; case JAVA: - registry.setFallbackCoderProvider(SerializableCoder.PROVIDER); + registry.registerCoderProvider(SerializableCoder.getCoderProvider()); break; } } @@ -621,22 +621,17 @@ public class NexmarkUtils { } @Override - public void encode(KnownSize value, OutputStream outStream, Context context) + public void encode(KnownSize value, OutputStream outStream) throws CoderException, IOException { @SuppressWarnings("unchecked") T typedValue = (T) value; - trueCoder.encode(typedValue, outStream, context); + trueCoder.encode(typedValue, outStream); } @Override - public KnownSize decode(InputStream inStream, Context context) + public KnownSize decode(InputStream inStream) throws CoderException, IOException { - return trueCoder.decode(inStream, context); - } - - @Override - public List<? extends Coder<?>> getComponents() { - return ImmutableList.of(trueCoder); + return trueCoder.decode(inStream); } } http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java index 5c018dc..9f5d7c0 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java @@ -39,35 +39,34 @@ public class Auction implements KnownSize, Serializable { public static final Coder<Auction> CODER = new CustomCoder<Auction>() { @Override - public void encode(Auction value, OutputStream outStream, - Coder.Context context) + public void encode(Auction value, OutputStream outStream) throws CoderException, IOException { - LONG_CODER.encode(value.id, outStream, Context.NESTED); - STRING_CODER.encode(value.itemName, outStream, Context.NESTED); - STRING_CODER.encode(value.description, outStream, Context.NESTED); - LONG_CODER.encode(value.initialBid, outStream, Context.NESTED); - LONG_CODER.encode(value.reserve, outStream, Context.NESTED); - LONG_CODER.encode(value.dateTime, outStream, Context.NESTED); - LONG_CODER.encode(value.expires, outStream, Context.NESTED); - LONG_CODER.encode(value.seller, outStream, Context.NESTED); - LONG_CODER.encode(value.category, outStream, Context.NESTED); - STRING_CODER.encode(value.extra, outStream, Context.NESTED); + LONG_CODER.encode(value.id, outStream); + STRING_CODER.encode(value.itemName, outStream); + STRING_CODER.encode(value.description, outStream); + LONG_CODER.encode(value.initialBid, outStream); + LONG_CODER.encode(value.reserve, outStream); + LONG_CODER.encode(value.dateTime, outStream); + LONG_CODER.encode(value.expires, outStream); + LONG_CODER.encode(value.seller, outStream); + LONG_CODER.encode(value.category, outStream); + STRING_CODER.encode(value.extra, outStream); } @Override public Auction decode( - InputStream inStream, Coder.Context context) + InputStream inStream) throws CoderException, IOException { - long id = LONG_CODER.decode(inStream, Context.NESTED); - String itemName = STRING_CODER.decode(inStream, Context.NESTED); - String description = STRING_CODER.decode(inStream, Context.NESTED); - long initialBid = LONG_CODER.decode(inStream, Context.NESTED); - long reserve = LONG_CODER.decode(inStream, Context.NESTED); - long dateTime = LONG_CODER.decode(inStream, Context.NESTED); - long expires = LONG_CODER.decode(inStream, Context.NESTED); - long seller = LONG_CODER.decode(inStream, Context.NESTED); - long category = LONG_CODER.decode(inStream, Context.NESTED); - String extra = STRING_CODER.decode(inStream, Context.NESTED); + long id = LONG_CODER.decode(inStream); + String itemName = STRING_CODER.decode(inStream); + String description = STRING_CODER.decode(inStream); + long initialBid = LONG_CODER.decode(inStream); + long reserve = LONG_CODER.decode(inStream); + long dateTime = LONG_CODER.decode(inStream); + long expires = LONG_CODER.decode(inStream); + long seller = LONG_CODER.decode(inStream); + long category = LONG_CODER.decode(inStream); + String extra = STRING_CODER.decode(inStream); return new Auction( id, itemName, description, initialBid, reserve, dateTime, expires, seller, category, extra); http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java index b1d9ec2..b9d79db 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java @@ -34,19 +34,18 @@ import org.apache.beam.sdk.coders.CustomCoder; public class AuctionBid implements KnownSize, Serializable { public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() { @Override - public void encode(AuctionBid value, OutputStream outStream, - Coder.Context context) + public void encode(AuctionBid value, OutputStream outStream) throws CoderException, IOException { - Auction.CODER.encode(value.auction, outStream, Context.NESTED); - Bid.CODER.encode(value.bid, outStream, Context.NESTED); + Auction.CODER.encode(value.auction, outStream); + Bid.CODER.encode(value.bid, outStream); } @Override public AuctionBid decode( - InputStream inStream, Coder.Context context) + InputStream inStream) throws CoderException, IOException { - Auction auction = Auction.CODER.decode(inStream, Context.NESTED); - Bid bid = Bid.CODER.decode(inStream, Context.NESTED); + Auction auction = Auction.CODER.decode(inStream); + Bid bid = Bid.CODER.decode(inStream); return new AuctionBid(auction, bid); } }; http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java index c83a455..0e643ff 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java @@ -37,19 +37,17 @@ public class AuctionCount implements KnownSize, Serializable { public static final Coder<AuctionCount> CODER = new CustomCoder<AuctionCount>() { @Override - public void encode(AuctionCount value, OutputStream outStream, - Coder.Context context) + public void encode(AuctionCount value, OutputStream outStream) throws CoderException, IOException { - LONG_CODER.encode(value.auction, outStream, Context.NESTED); - LONG_CODER.encode(value.count, outStream, Context.NESTED); + LONG_CODER.encode(value.auction, outStream); + LONG_CODER.encode(value.count, outStream); } @Override - public AuctionCount decode( - InputStream inStream, Coder.Context context) + public AuctionCount decode(InputStream inStream) throws CoderException, IOException { - long auction = LONG_CODER.decode(inStream, Context.NESTED); - long count = LONG_CODER.decode(inStream, Context.NESTED); + long auction = LONG_CODER.decode(inStream); + long count = LONG_CODER.decode(inStream); return new AuctionCount(auction, count); } }; http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java index 43d0b27..7d51a21 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java @@ -37,19 +37,18 @@ public class AuctionPrice implements KnownSize, Serializable { public static final Coder<AuctionPrice> CODER = new CustomCoder<AuctionPrice>() { @Override - public void encode(AuctionPrice value, OutputStream outStream, - Coder.Context context) + public void encode(AuctionPrice value, OutputStream outStream) throws CoderException, IOException { - LONG_CODER.encode(value.auction, outStream, Context.NESTED); - LONG_CODER.encode(value.price, outStream, Context.NESTED); + LONG_CODER.encode(value.auction, outStream); + LONG_CODER.encode(value.price, outStream); } @Override public AuctionPrice decode( - InputStream inStream, Coder.Context context) + InputStream inStream) throws CoderException, IOException { - long auction = LONG_CODER.decode(inStream, Context.NESTED); - long price = LONG_CODER.decode(inStream, Context.NESTED); + long auction = LONG_CODER.decode(inStream); + long price = LONG_CODER.decode(inStream); return new AuctionPrice(auction, price); } }; http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java index faeb928..4fa9ea0 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java @@ -40,25 +40,24 @@ public class Bid implements KnownSize, Serializable { public static final Coder<Bid> CODER = new CustomCoder<Bid>() { @Override - public void encode(Bid value, OutputStream outStream, - Coder.Context context) + public void encode(Bid value, OutputStream outStream) throws CoderException, IOException { - LONG_CODER.encode(value.auction, outStream, Context.NESTED); - LONG_CODER.encode(value.bidder, outStream, Context.NESTED); - LONG_CODER.encode(value.price, outStream, Context.NESTED); - LONG_CODER.encode(value.dateTime, outStream, Context.NESTED); - STRING_CODER.encode(value.extra, outStream, Context.NESTED); + LONG_CODER.encode(value.auction, outStream); + LONG_CODER.encode(value.bidder, outStream); + LONG_CODER.encode(value.price, outStream); + LONG_CODER.encode(value.dateTime, outStream); + STRING_CODER.encode(value.extra, outStream); } @Override public Bid decode( - InputStream inStream, Coder.Context context) + InputStream inStream) throws CoderException, IOException { - long auction = LONG_CODER.decode(inStream, Context.NESTED); - long bidder = LONG_CODER.decode(inStream, Context.NESTED); - long price = LONG_CODER.decode(inStream, Context.NESTED); - long dateTime = LONG_CODER.decode(inStream, Context.NESTED); - String extra = STRING_CODER.decode(inStream, Context.NESTED); + long auction = LONG_CODER.decode(inStream); + long bidder = LONG_CODER.decode(inStream); + long price = LONG_CODER.decode(inStream); + long dateTime = LONG_CODER.decode(inStream); + String extra = STRING_CODER.decode(inStream); return new Bid(auction, bidder, price, dateTime, extra); } http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java index 6dddf34..3211456 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java @@ -37,19 +37,18 @@ public class BidsPerSession implements KnownSize, Serializable { public static final Coder<BidsPerSession> CODER = new CustomCoder<BidsPerSession>() { @Override - public void encode(BidsPerSession value, OutputStream outStream, - Coder.Context context) + public void encode(BidsPerSession value, OutputStream outStream) throws CoderException, IOException { - LONG_CODER.encode(value.personId, outStream, Context.NESTED); - LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED); + LONG_CODER.encode(value.personId, outStream); + LONG_CODER.encode(value.bidsPerSession, outStream); } @Override public BidsPerSession decode( - InputStream inStream, Coder.Context context) + InputStream inStream) throws CoderException, IOException { - long personId = LONG_CODER.decode(inStream, Context.NESTED); - long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED); + long personId = LONG_CODER.decode(inStream); + long bidsPerSession = LONG_CODER.decode(inStream); return new BidsPerSession(personId, bidsPerSession); } @Override public void verifyDeterministic() throws NonDeterministicException {} http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java index ccb2bc7..2678198 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java @@ -39,21 +39,19 @@ public class CategoryPrice implements KnownSize, Serializable { public static final Coder<CategoryPrice> CODER = new CustomCoder<CategoryPrice>() { @Override - public void encode(CategoryPrice value, OutputStream outStream, - Coder.Context context) + public void encode(CategoryPrice value, OutputStream outStream) throws CoderException, IOException { - LONG_CODER.encode(value.category, outStream, Context.NESTED); - LONG_CODER.encode(value.price, outStream, Context.NESTED); - INT_CODER.encode(value.isLast ? 1 : 0, outStream, Context.NESTED); + LONG_CODER.encode(value.category, outStream); + LONG_CODER.encode(value.price, outStream); + INT_CODER.encode(value.isLast ? 1 : 0, outStream); } @Override - public CategoryPrice decode( - InputStream inStream, Coder.Context context) + public CategoryPrice decode(InputStream inStream) throws CoderException, IOException { - long category = LONG_CODER.decode(inStream, Context.NESTED); - long price = LONG_CODER.decode(inStream, Context.NESTED); - boolean isLast = INT_CODER.decode(inStream, context) != 0; + long category = LONG_CODER.decode(inStream); + long price = LONG_CODER.decode(inStream); + boolean isLast = INT_CODER.decode(inStream) != 0; return new CategoryPrice(category, price, isLast); } @Override public void verifyDeterministic() throws NonDeterministicException {} http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java index 0c14e8f..b0a88d4 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java @@ -37,17 +37,15 @@ public class Done implements KnownSize, Serializable { public static final Coder<Done> CODER = new CustomCoder<Done>() { @Override - public void encode(Done value, OutputStream outStream, - Coder.Context context) + public void encode(Done value, OutputStream outStream) throws CoderException, IOException { - STRING_CODER.encode(value.message, outStream, Context.NESTED); + STRING_CODER.encode(value.message, outStream); } @Override - public Done decode( - InputStream inStream, Coder.Context context) + public Done decode(InputStream inStream) throws CoderException, IOException { - String message = STRING_CODER.decode(inStream, Context.NESTED); + String message = STRING_CODER.decode(inStream); return new Done(message); } @Override public void verifyDeterministic() throws NonDeterministicException {} http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java index 1f1f096..d813833 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java @@ -36,17 +36,17 @@ public class Event implements KnownSize, Serializable { public static final Coder<Event> CODER = new CustomCoder<Event>() { @Override - public void encode(Event value, OutputStream outStream, Coder.Context context) + public void encode(Event value, OutputStream outStream) throws CoderException, IOException { if (value.newPerson != null) { - INT_CODER.encode(0, outStream, Context.NESTED); - Person.CODER.encode(value.newPerson, outStream, Context.NESTED); + INT_CODER.encode(0, outStream); + Person.CODER.encode(value.newPerson, outStream); } else if (value.newAuction != null) { - INT_CODER.encode(1, outStream, Context.NESTED); - Auction.CODER.encode(value.newAuction, outStream, Context.NESTED); + INT_CODER.encode(1, outStream); + Auction.CODER.encode(value.newAuction, outStream); } else if (value.bid != null) { - INT_CODER.encode(2, outStream, Context.NESTED); - Bid.CODER.encode(value.bid, outStream, Context.NESTED); + INT_CODER.encode(2, outStream); + Bid.CODER.encode(value.bid, outStream); } else { throw new RuntimeException("invalid event"); } @@ -54,17 +54,17 @@ public class Event implements KnownSize, Serializable { @Override public Event decode( - InputStream inStream, Coder.Context context) + InputStream inStream) throws CoderException, IOException { - int tag = INT_CODER.decode(inStream, context); + int tag = INT_CODER.decode(inStream); if (tag == 0) { - Person person = Person.CODER.decode(inStream, Context.NESTED); + Person person = Person.CODER.decode(inStream); return new Event(person); } else if (tag == 1) { - Auction auction = Auction.CODER.decode(inStream, Context.NESTED); + Auction auction = Auction.CODER.decode(inStream); return new Event(auction); } else if (tag == 2) { - Bid bid = Bid.CODER.decode(inStream, Context.NESTED); + Bid bid = Bid.CODER.decode(inStream); return new Event(bid); } else { throw new RuntimeException("invalid event encoding"); http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java index 17b8c4a..8cade4e 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java @@ -39,21 +39,20 @@ public class IdNameReserve implements KnownSize, Serializable { public static final Coder<IdNameReserve> CODER = new CustomCoder<IdNameReserve>() { @Override - public void encode(IdNameReserve value, OutputStream outStream, - Coder.Context context) + public void encode(IdNameReserve value, OutputStream outStream) throws CoderException, IOException { - LONG_CODER.encode(value.id, outStream, Context.NESTED); - STRING_CODER.encode(value.name, outStream, Context.NESTED); - LONG_CODER.encode(value.reserve, outStream, Context.NESTED); + LONG_CODER.encode(value.id, outStream); + STRING_CODER.encode(value.name, outStream); + LONG_CODER.encode(value.reserve, outStream); } @Override public IdNameReserve decode( - InputStream inStream, Coder.Context context) + InputStream inStream) throws CoderException, IOException { - long id = LONG_CODER.decode(inStream, Context.NESTED); - String name = STRING_CODER.decode(inStream, Context.NESTED); - long reserve = LONG_CODER.decode(inStream, Context.NESTED); + long id = LONG_CODER.decode(inStream); + String name = STRING_CODER.decode(inStream); + long reserve = LONG_CODER.decode(inStream); return new IdNameReserve(id, name, reserve); } @Override public void verifyDeterministic() throws NonDeterministicException {} http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java index 28f25cd..37bd3c6 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java @@ -39,23 +39,21 @@ public class NameCityStateId implements KnownSize, Serializable { public static final Coder<NameCityStateId> CODER = new CustomCoder<NameCityStateId>() { @Override - public void encode(NameCityStateId value, OutputStream outStream, - Coder.Context context) + public void encode(NameCityStateId value, OutputStream outStream) throws CoderException, IOException { - STRING_CODER.encode(value.name, outStream, Context.NESTED); - STRING_CODER.encode(value.city, outStream, Context.NESTED); - STRING_CODER.encode(value.state, outStream, Context.NESTED); - LONG_CODER.encode(value.id, outStream, Context.NESTED); + STRING_CODER.encode(value.name, outStream); + STRING_CODER.encode(value.city, outStream); + STRING_CODER.encode(value.state, outStream); + LONG_CODER.encode(value.id, outStream); } @Override - public NameCityStateId decode( - InputStream inStream, Coder.Context context) + public NameCityStateId decode(InputStream inStream) throws CoderException, IOException { - String name = STRING_CODER.decode(inStream, Context.NESTED); - String city = STRING_CODER.decode(inStream, Context.NESTED); - String state = STRING_CODER.decode(inStream, Context.NESTED); - long id = LONG_CODER.decode(inStream, Context.NESTED); + String name = STRING_CODER.decode(inStream); + String city = STRING_CODER.decode(inStream); + String state = STRING_CODER.decode(inStream); + long id = LONG_CODER.decode(inStream); return new NameCityStateId(name, city, state, id); } @Override public void verifyDeterministic() throws NonDeterministicException {} http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java index c690fd4..bde587d 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java @@ -38,31 +38,29 @@ public class Person implements KnownSize, Serializable { private static final Coder<String> STRING_CODER = StringUtf8Coder.of(); public static final Coder<Person> CODER = new CustomCoder<Person>() { @Override - public void encode(Person value, OutputStream outStream, - Coder.Context context) + public void encode(Person value, OutputStream outStream) throws CoderException, IOException { - LONG_CODER.encode(value.id, outStream, Context.NESTED); - STRING_CODER.encode(value.name, outStream, Context.NESTED); - STRING_CODER.encode(value.emailAddress, outStream, Context.NESTED); - STRING_CODER.encode(value.creditCard, outStream, Context.NESTED); - STRING_CODER.encode(value.city, outStream, Context.NESTED); - STRING_CODER.encode(value.state, outStream, Context.NESTED); - LONG_CODER.encode(value.dateTime, outStream, Context.NESTED); - STRING_CODER.encode(value.extra, outStream, Context.NESTED); + LONG_CODER.encode(value.id, outStream); + STRING_CODER.encode(value.name, outStream); + STRING_CODER.encode(value.emailAddress, outStream); + STRING_CODER.encode(value.creditCard, outStream); + STRING_CODER.encode(value.city, outStream); + STRING_CODER.encode(value.state, outStream); + LONG_CODER.encode(value.dateTime, outStream); + STRING_CODER.encode(value.extra, outStream); } @Override - public Person decode( - InputStream inStream, Coder.Context context) + public Person decode(InputStream inStream) throws CoderException, IOException { - long id = LONG_CODER.decode(inStream, Context.NESTED); - String name = STRING_CODER.decode(inStream, Context.NESTED); - String emailAddress = STRING_CODER.decode(inStream, Context.NESTED); - String creditCard = STRING_CODER.decode(inStream, Context.NESTED); - String city = STRING_CODER.decode(inStream, Context.NESTED); - String state = STRING_CODER.decode(inStream, Context.NESTED); - long dateTime = LONG_CODER.decode(inStream, Context.NESTED); - String extra = STRING_CODER.decode(inStream, Context.NESTED); + long id = LONG_CODER.decode(inStream); + String name = STRING_CODER.decode(inStream); + String emailAddress = STRING_CODER.decode(inStream); + String creditCard = STRING_CODER.decode(inStream); + String city = STRING_CODER.decode(inStream); + String state = STRING_CODER.decode(inStream); + long dateTime = LONG_CODER.decode(inStream); + String extra = STRING_CODER.decode(inStream); return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra); } @Override public void verifyDeterministic() throws NonDeterministicException {} http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java index 52ff540..61537f6 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java @@ -37,19 +37,18 @@ public class SellerPrice implements KnownSize, Serializable { public static final Coder<SellerPrice> CODER = new CustomCoder<SellerPrice>() { @Override - public void encode(SellerPrice value, OutputStream outStream, - Coder.Context context) + public void encode(SellerPrice value, OutputStream outStream) throws CoderException, IOException { - LONG_CODER.encode(value.seller, outStream, Context.NESTED); - LONG_CODER.encode(value.price, outStream, Context.NESTED); + LONG_CODER.encode(value.seller, outStream); + LONG_CODER.encode(value.price, outStream); } @Override public SellerPrice decode( - InputStream inStream, Coder.Context context) + InputStream inStream) throws CoderException, IOException { - long seller = LONG_CODER.decode(inStream, Context.NESTED); - long price = LONG_CODER.decode(inStream, Context.NESTED); + long seller = LONG_CODER.decode(inStream); + long price = LONG_CODER.decode(inStream); return new SellerPrice(seller, price); } @Override public void verifyDeterministic() throws NonDeterministicException {} http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java index 52891a7..bd6c2ed 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java @@ -156,19 +156,19 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct } @Override - public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context) + public void encode(AuctionOrBidWindow window, OutputStream outStream) throws IOException, CoderException { - SUPER_CODER.encode(window, outStream, Coder.Context.NESTED); - ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED); - INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED); + SUPER_CODER.encode(window, outStream); + ID_CODER.encode(window.auction, outStream); + INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream); } @Override - public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context) + public AuctionOrBidWindow decode(InputStream inStream) throws IOException, CoderException { - IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED); - long auction = ID_CODER.decode(inStream, Coder.Context.NESTED); - boolean isAuctionWindow = INT_CODER.decode(inStream, Context.NESTED) != 0; + IntervalWindow superWindow = SUPER_CODER.decode(inStream); + long auction = ID_CODER.decode(inStream); + boolean isAuctionWindow = INT_CODER.decode(inStream) != 0; return new AuctionOrBidWindow( superWindow.start(), superWindow.end(), auction, isAuctionWindow); } http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java index 2a2732b..4f548cd 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java @@ -102,22 +102,17 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl /** Coder for this class. */ public static final Coder<Checkpoint> CODER_INSTANCE = new CustomCoder<Checkpoint>() { - @Override - public void encode( - Checkpoint value, - OutputStream outStream, - Coder.Context context) - throws CoderException, IOException { - LONG_CODER.encode(value.numEvents, outStream, Context.NESTED); - LONG_CODER.encode(value.wallclockBaseTime, outStream, Context.NESTED); + @Override public void encode(Checkpoint value, OutputStream outStream) + throws CoderException, IOException { + LONG_CODER.encode(value.numEvents, outStream); + LONG_CODER.encode(value.wallclockBaseTime, outStream); } @Override - public Checkpoint decode( - InputStream inStream, Coder.Context context) + public Checkpoint decode(InputStream inStream) throws CoderException, IOException { - long numEvents = LONG_CODER.decode(inStream, Context.NESTED); - long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED); + long numEvents = LONG_CODER.decode(inStream); + long wallclockBaseTime = LONG_CODER.decode(inStream); return new Checkpoint(numEvents, wallclockBaseTime); } @Override public void verifyDeterministic() throws NonDeterministicException {} http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/pom.xml b/integration/java/pom.xml index dcad4c3..b0c3853 100644 --- a/integration/java/pom.xml +++ b/integration/java/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-integration-parent</artifactId> - <version>0.7.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/pom.xml ---------------------------------------------------------------------- diff --git a/integration/pom.xml b/integration/pom.xml index 31f293e..4254819 100644 --- a/integration/pom.xml +++ b/integration/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-parent</artifactId> - <version>0.7.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent>