[ 
https://issues.apache.org/jira/browse/BEAM-5817?focusedWorklogId=163212&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-163212
 ]

ASF GitHub Bot logged work on BEAM-5817:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Nov/18 21:55
            Start Date: 06/Nov/18 21:55
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #6934: [BEAM-5817] 
Unify Nexmark SQL and non-SQL paths
URL: https://github.com/apache/beam/pull/6934
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index 7a60e4d235d..cbc1de0d270 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -54,6 +54,7 @@
 import org.apache.beam.sdk.nexmark.queries.BoundedSideInputJoinModel;
 import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
 import org.apache.beam.sdk.nexmark.queries.NexmarkQueryModel;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryUtil;
 import org.apache.beam.sdk.nexmark.queries.Query0;
 import org.apache.beam.sdk.nexmark.queries.Query0Model;
 import org.apache.beam.sdk.nexmark.queries.Query1;
@@ -77,7 +78,6 @@
 import org.apache.beam.sdk.nexmark.queries.Query8Model;
 import org.apache.beam.sdk.nexmark.queries.Query9;
 import org.apache.beam.sdk.nexmark.queries.Query9Model;
-import org.apache.beam.sdk.nexmark.queries.sql.NexmarkSqlQuery;
 import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery0;
 import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery1;
 import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery2;
@@ -724,7 +724,7 @@ public void processElement(ProcessContext c) throws 
IOException {
     NexmarkUtils.console("Reading events from Avro files at %s", filename);
     return p.apply(
             queryName + ".ReadAvroEvents", 
AvroIO.read(Event.class).from(filename + "*.avro"))
-        .apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA);
+        .apply("OutputWithTimestamp", 
NexmarkQueryUtil.EVENT_TIMESTAMP_FROM_DATA);
   }
 
   /** Send {@code events} to Pubsub. */
@@ -792,17 +792,17 @@ private void sinkEventsToAvro(PCollection<Event> source) {
         queryName + ".WriteAvroEvents",
         AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro"));
     source
-        .apply(NexmarkQuery.JUST_BIDS)
+        .apply(NexmarkQueryUtil.JUST_BIDS)
         .apply(
             queryName + ".WriteAvroBids",
             AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro"));
     source
-        .apply(NexmarkQuery.JUST_NEW_AUCTIONS)
+        .apply(NexmarkQueryUtil.JUST_NEW_AUCTIONS)
         .apply(
             queryName + ".WriteAvroAuctions",
             AvroIO.write(Auction.class).to(filename + 
"/auction").withSuffix(".avro"));
     source
-        .apply(NexmarkQuery.JUST_NEW_PERSONS)
+        .apply(NexmarkQueryUtil.JUST_NEW_PERSONS)
         .apply(
             queryName + ".WriteAvroPeople",
             AvroIO.write(Person.class).to(filename + 
"/person").withSuffix(".avro"));
@@ -1087,7 +1087,7 @@ public NexmarkPerf run(NexmarkConfiguration 
runConfiguration) throws IOException
         return null;
       }
 
-      NexmarkQuery query = getNexmarkQuery();
+      NexmarkQuery<? extends KnownSize> query = getNexmarkQuery();
       if (query == null) {
         NexmarkUtils.console("skipping since configuration is not 
implemented");
         return null;
@@ -1112,8 +1112,8 @@ public NexmarkPerf run(NexmarkConfiguration 
runConfiguration) throws IOException
       // Generate events.
       PCollection<Event> source = createSource(p, now);
 
-      if (query.needsSideInput()) {
-        query.setSideInput(NexmarkUtils.prepareSideInput(p, configuration));
+      if (query.getTransform().needsSideInput()) {
+        query.getTransform().setSideInput(NexmarkUtils.prepareSideInput(p, 
configuration));
       }
 
       if (options.getLogEvents()) {
@@ -1136,12 +1136,13 @@ public NexmarkPerf run(NexmarkConfiguration 
runConfiguration) throws IOException
           if (options.getOutputPath() != null && 
!options.getOutputPath().isEmpty()) {
             path = logsDir(now.getMillis());
           }
-          ((Query10) query).setOutputPath(path);
-          ((Query10) query).setMaxNumWorkers(maxNumWorkers());
+          ((Query10) query.getTransform()).setOutputPath(path);
+          ((Query10) query.getTransform()).setMaxNumWorkers(maxNumWorkers());
         }
 
         // Apply query.
-        PCollection<TimestampedValue<KnownSize>> results = source.apply(query);
+        PCollection<TimestampedValue<KnownSize>> results =
+            (PCollection<TimestampedValue<KnownSize>>) source.apply(query);
 
         if (options.getAssertCorrectness()) {
           if (model == null) {
@@ -1180,7 +1181,7 @@ private NexmarkQueryModel getNexmarkQueryModel() {
     return models.get(configuration.query);
   }
 
-  private NexmarkQuery getNexmarkQuery() {
+  private NexmarkQuery<?> getNexmarkQuery() {
     Map<NexmarkQueryName, NexmarkQuery> queries = createQueries();
     return queries.get(configuration.query);
   }
@@ -1215,41 +1216,61 @@ private NexmarkQuery getNexmarkQuery() {
 
   private Map<NexmarkQueryName, NexmarkQuery> createSqlQueries() {
     return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder()
-        .put(NexmarkQueryName.PASSTHROUGH, new NexmarkSqlQuery(configuration, 
new SqlQuery0()))
-        .put(
-            NexmarkQueryName.CURRENCY_CONVERSION,
-            new NexmarkSqlQuery(configuration, new SqlQuery1()))
+        .put(NexmarkQueryName.PASSTHROUGH, new NexmarkQuery(configuration, new 
SqlQuery0()))
+        .put(NexmarkQueryName.CURRENCY_CONVERSION, new 
NexmarkQuery(configuration, new SqlQuery1()))
         .put(
             NexmarkQueryName.SELECTION,
-            new NexmarkSqlQuery(configuration, new 
SqlQuery2(configuration.auctionSkip)))
+            new NexmarkQuery(configuration, new 
SqlQuery2(configuration.auctionSkip)))
         .put(
             NexmarkQueryName.LOCAL_ITEM_SUGGESTION,
-            new NexmarkSqlQuery(configuration, new SqlQuery3(configuration)))
+            new NexmarkQuery(configuration, new SqlQuery3(configuration)))
         .put(
             NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY,
-            new NexmarkSqlQuery(configuration, new SqlQuery5(configuration)))
+            new NexmarkQuery(configuration, new SqlQuery5(configuration)))
         .put(
             NexmarkQueryName.HOT_ITEMS,
-            new NexmarkSqlQuery(configuration, new SqlQuery7(configuration)))
+            new NexmarkQuery(configuration, new SqlQuery7(configuration)))
         .build();
   }
 
   private Map<NexmarkQueryName, NexmarkQuery> createJavaQueries() {
     return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder()
-        .put(NexmarkQueryName.PASSTHROUGH, new Query0(configuration))
-        .put(NexmarkQueryName.CURRENCY_CONVERSION, new Query1(configuration))
-        .put(NexmarkQueryName.SELECTION, new Query2(configuration))
-        .put(NexmarkQueryName.LOCAL_ITEM_SUGGESTION, new Query3(configuration))
-        .put(NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY, new 
Query4(configuration))
-        .put(NexmarkQueryName.HOT_ITEMS, new Query5(configuration))
-        .put(NexmarkQueryName.AVERAGE_SELLING_PRICE_BY_SELLER, new 
Query6(configuration))
-        .put(NexmarkQueryName.HIGHEST_BID, new Query7(configuration))
-        .put(NexmarkQueryName.MONITOR_NEW_USERS, new Query8(configuration))
-        .put(NexmarkQueryName.WINNING_BIDS, new Query9(configuration))
-        .put(NexmarkQueryName.LOG_TO_SHARDED_FILES, new Query10(configuration))
-        .put(NexmarkQueryName.USER_SESSIONS, new Query11(configuration))
-        .put(NexmarkQueryName.PROCESSING_TIME_WINDOWS, new 
Query12(configuration))
-        .put(NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN, new 
BoundedSideInputJoin(configuration))
+        .put(NexmarkQueryName.PASSTHROUGH, new NexmarkQuery(configuration, new 
Query0()))
+        .put(
+            NexmarkQueryName.CURRENCY_CONVERSION,
+            new NexmarkQuery(configuration, new Query1(configuration)))
+        .put(NexmarkQueryName.SELECTION, new NexmarkQuery(configuration, new 
Query2(configuration)))
+        .put(
+            NexmarkQueryName.LOCAL_ITEM_SUGGESTION,
+            new NexmarkQuery(configuration, new Query3(configuration)))
+        .put(
+            NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY,
+            new NexmarkQuery(configuration, new Query4(configuration)))
+        .put(NexmarkQueryName.HOT_ITEMS, new NexmarkQuery(configuration, new 
Query5(configuration)))
+        .put(
+            NexmarkQueryName.AVERAGE_SELLING_PRICE_BY_SELLER,
+            new NexmarkQuery(configuration, new Query6(configuration)))
+        .put(
+            NexmarkQueryName.HIGHEST_BID,
+            new NexmarkQuery(configuration, new Query7(configuration)))
+        .put(
+            NexmarkQueryName.MONITOR_NEW_USERS,
+            new NexmarkQuery(configuration, new Query8(configuration)))
+        .put(
+            NexmarkQueryName.WINNING_BIDS,
+            new NexmarkQuery(configuration, new Query9(configuration)))
+        .put(
+            NexmarkQueryName.LOG_TO_SHARDED_FILES,
+            new NexmarkQuery(configuration, new Query10(configuration)))
+        .put(
+            NexmarkQueryName.USER_SESSIONS,
+            new NexmarkQuery(configuration, new Query11(configuration)))
+        .put(
+            NexmarkQueryName.PROCESSING_TIME_WINDOWS,
+            new NexmarkQuery(configuration, new Query12(configuration)))
+        .put(
+            NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN,
+            new NexmarkQuery(configuration, new 
BoundedSideInputJoin(configuration)))
         .build();
   }
 
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java
index 7a74968560c..3cc92cecd1b 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java
@@ -22,6 +22,7 @@
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
@@ -33,7 +34,7 @@
  * @param <InputT> Type of input elements.
  * @param <OutputT> Type of output elements.
  */
-public abstract class AbstractSimulator<InputT, OutputT> {
+public abstract class AbstractSimulator<InputT, OutputT extends KnownSize> {
   /** Window size for action bucket sampling. */
   private static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
 
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoin.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoin.java
index cb9b9ab199d..c5f0f58472d 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoin.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoin.java
@@ -21,10 +21,8 @@
 
 import java.util.Map;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -40,9 +38,12 @@
  * WHERE bid.id = sideInput.id
  * </pre>
  */
-public class BoundedSideInputJoin extends NexmarkQuery {
+public class BoundedSideInputJoin extends NexmarkQueryTransform<Bid> {
+  private final NexmarkConfiguration configuration;
+
   public BoundedSideInputJoin(NexmarkConfiguration configuration) {
-    super(configuration, "JoinToFiles");
+    super("JoinToFiles");
+    this.configuration = configuration;
   }
 
   @Override
@@ -50,7 +51,8 @@ public boolean needsSideInput() {
     return true;
   }
 
-  private PCollection<Bid> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<Bid> expand(PCollection<Event> events) {
 
     checkState(getSideInput() != null, "Configuration error: side input is 
null");
 
@@ -58,7 +60,7 @@ public boolean needsSideInput() {
 
     return events
         // Only want the bid events; easier to fake some side input data
-        .apply(JUST_BIDS)
+        .apply(NexmarkQueryUtil.JUST_BIDS)
 
         // Map the conversion function over all bids.
         .apply(
@@ -80,9 +82,4 @@ public void processElement(ProcessContext c) {
                     })
                 .withSideInputs(sideInputMap));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinModel.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinModel.java
index 822f343244b..e1555ee1a0e 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinModel.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinModel.java
@@ -26,7 +26,7 @@
 import org.apache.beam.sdk.values.TimestampedValue;
 
 /** A direct implementation of {@link BoundedSideInputJoin}. */
-public class BoundedSideInputJoinModel extends NexmarkQueryModel {
+public class BoundedSideInputJoinModel extends NexmarkQueryModel<Bid> {
 
   /** Simulator for query 0. */
   private static class Simulator extends AbstractSimulator<Event, Bid> {
@@ -70,12 +70,12 @@ public BoundedSideInputJoinModel(NexmarkConfiguration 
configuration) {
   }
 
   @Override
-  public AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, Bid> simulator() {
     return new Simulator(configuration);
   }
 
   @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+  protected Collection<String> toCollection(Iterator<TimestampedValue<Bid>> 
itr) {
     return toValueTimestamp(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
index b47a0f12d35..669c435da74 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
@@ -17,193 +17,34 @@
  */
 package org.apache.beam.sdk.nexmark.queries;
 
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.nexmark.Monitor;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.nexmark.model.Auction;
-import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.nexmark.model.KnownSize;
-import org.apache.beam.sdk.nexmark.model.Person;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
 
-/**
- * Base class for the eight 'NEXMark' queries. Supplies some fragments common 
to multiple queries.
- */
-public abstract class NexmarkQuery
-    extends PTransform<PCollection<Event>, 
PCollection<TimestampedValue<KnownSize>>> {
-  public static final TupleTag<Auction> AUCTION_TAG = new 
TupleTag<>("auctions");
-  public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids");
-  static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
-
-  /** Predicate to detect a new person event. */
-  private static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
-      event -> event.newPerson != null;
-
-  /** DoFn to convert a new person event to a person. */
-  private static final DoFn<Event, Person> AS_PERSON =
-      new DoFn<Event, Person>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-          c.output(c.element().newPerson);
-        }
-      };
-
-  /** Predicate to detect a new auction event. */
-  private static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
-      event -> event.newAuction != null;
-
-  /** DoFn to convert a new auction event to an auction. */
-  private static final DoFn<Event, Auction> AS_AUCTION =
-      new DoFn<Event, Auction>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-          c.output(c.element().newAuction);
-        }
-      };
-
-  /** Predicate to detect a new bid event. */
-  public static final SerializableFunction<Event, Boolean> IS_BID = event -> 
event.bid != null;
-
-  /** DoFn to convert a bid event to a bid. */
-  private static final DoFn<Event, Bid> AS_BID =
-      new DoFn<Event, Bid>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-          c.output(c.element().bid);
-        }
-      };
-
-  /** Transform to key each person by their id. */
-  static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
-      ParDo.of(
-          new DoFn<Person, KV<Long, Person>>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              c.output(KV.of(c.element().id, c.element()));
-            }
-          });
-
-  /** Transform to key each auction by its id. */
-  static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
-      ParDo.of(
-          new DoFn<Auction, KV<Long, Auction>>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              c.output(KV.of(c.element().id, c.element()));
-            }
-          });
-
-  /** Transform to key each auction by its seller id. */
-  static final ParDo.SingleOutput<Auction, KV<Long, Auction>> 
AUCTION_BY_SELLER =
-      ParDo.of(
-          new DoFn<Auction, KV<Long, Auction>>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              c.output(KV.of(c.element().seller, c.element()));
-            }
-          });
-
-  /** Transform to key each bid by it's auction id. */
-  static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
-      ParDo.of(
-          new DoFn<Bid, KV<Long, Bid>>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              c.output(KV.of(c.element().auction, c.element()));
-            }
-          });
-
-  /** Transform to project the auction id from each bid. */
-  static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
-      ParDo.of(
-          new DoFn<Bid, Long>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              c.output(c.element().auction);
-            }
-          });
-
-  /** Transform to project the price from each bid. */
-  static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
-      ParDo.of(
-          new DoFn<Bid, Long>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              c.output(c.element().price);
-            }
-          });
-
-  /** Transform to emit each event with the timestamp embedded within it. */
-  public static final ParDo.SingleOutput<Event, Event> 
EVENT_TIMESTAMP_FROM_DATA =
-      ParDo.of(
-          new DoFn<Event, Event>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              Event e = c.element();
-              if (e.bid != null) {
-                c.outputWithTimestamp(e, new Instant(e.bid.dateTime));
-              } else if (e.newPerson != null) {
-                c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime));
-              } else if (e.newAuction != null) {
-                c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime));
-              }
-            }
-          });
-
-  /** Transform to filter for just the new auction events. */
-  public static final PTransform<PCollection<Event>, PCollection<Auction>> 
JUST_NEW_AUCTIONS =
-      new PTransform<PCollection<Event>, 
PCollection<Auction>>("justNewAuctions") {
-        @Override
-        public PCollection<Auction> expand(PCollection<Event> input) {
-          return input
-              .apply("IsNewAuction", Filter.by(IS_NEW_AUCTION))
-              .apply("AsAuction", ParDo.of(AS_AUCTION));
-        }
-      };
-
-  /** Transform to filter for just the new person events. */
-  public static final PTransform<PCollection<Event>, PCollection<Person>> 
JUST_NEW_PERSONS =
-      new PTransform<PCollection<Event>, 
PCollection<Person>>("justNewPersons") {
-        @Override
-        public PCollection<Person> expand(PCollection<Event> input) {
-          return input
-              .apply("IsNewPerson", Filter.by(IS_NEW_PERSON))
-              .apply("AsPerson", ParDo.of(AS_PERSON));
-        }
-      };
-
-  /** Transform to filter for just the bid events. */
-  public static final PTransform<PCollection<Event>, PCollection<Bid>> 
JUST_BIDS =
-      new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") {
-        @Override
-        public PCollection<Bid> expand(PCollection<Event> input) {
-          return input.apply("IsBid", Filter.by(IS_BID)).apply("AsBid", 
ParDo.of(AS_BID));
-        }
-      };
+/** Wrapper for 'NEXmark' query transforms that adds monitoring and snooping. 
*/
+public final class NexmarkQuery<T extends KnownSize>
+    extends PTransform<PCollection<Event>, PCollection<? extends 
TimestampedValue<T>>> {
 
   final NexmarkConfiguration configuration;
   public final Monitor<Event> eventMonitor;
-  public final Monitor<KnownSize> resultMonitor;
+  public final Monitor<T> resultMonitor;
   private final Monitor<Event> endOfStreamMonitor;
   private final Counter fatalCounter;
+  private final NexmarkQueryTransform<T> transform;
   private transient PCollection<KV<Long, String>> sideInput = null;
 
-  protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
-    super(name);
+  public NexmarkQuery(NexmarkConfiguration configuration, 
NexmarkQueryTransform<T> transform) {
+    super(transform.getName());
     this.configuration = configuration;
+    this.transform = transform;
     if (configuration.debug) {
       eventMonitor = new Monitor<>(name + ".Events", "event");
       resultMonitor = new Monitor<>(name + ".Results", "result");
@@ -217,31 +58,12 @@ protected NexmarkQuery(NexmarkConfiguration configuration, 
String name) {
     }
   }
 
-  /** Implement the actual query. All we know about the result is it has a 
known encoded size. */
-  protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> 
events);
-
-  /** Whether this query expects a side input to be populated. Defaults to 
{@code false}. */
-  public boolean needsSideInput() {
-    return false;
-  }
-
-  /**
-   * Set the side input for the query.
-   *
-   * <p>Note that due to the nature of side inputs, this instance of the query 
is now fixed and can
-   * only be safely applied in the pipeline where the side input was created.
-   */
-  public void setSideInput(PCollection<KV<Long, String>> sideInput) {
-    this.sideInput = sideInput;
-  }
-
-  /** Get the side input, if any. */
-  public @Nullable PCollection<KV<Long, String>> getSideInput() {
-    return sideInput;
+  public NexmarkQueryTransform<T> getTransform() {
+    return transform;
   }
 
   @Override
-  public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> 
events) {
+  public PCollection<TimestampedValue<T>> expand(PCollection<Event> events) {
 
     if (configuration.debug) {
       events =
@@ -264,7 +86,7 @@ public void setSideInput(PCollection<KV<Long, String>> 
sideInput) {
     }
 
     // Run the query.
-    PCollection<KnownSize> queryResults = applyPrim(events);
+    PCollection<T> queryResults = events.apply(transform);
 
     if (configuration.debug) {
       // Monitor results as they go by.
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
index 581f760d582..9b7af4e7458 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
@@ -36,7 +36,7 @@
  * Base class for models of the eight NEXMark queries. Provides an assertion 
function which can be
  * applied against the actual query results to check their consistency with 
the model.
  */
-public abstract class NexmarkQueryModel implements Serializable {
+public abstract class NexmarkQueryModel<T extends KnownSize> implements 
Serializable {
   public final NexmarkConfiguration configuration;
 
   NexmarkQueryModel(NexmarkConfiguration configuration) {
@@ -74,11 +74,10 @@ static Instant windowStart(Duration size, Duration period, 
Instant timestamp) {
   }
 
   /** Return simulator for query. */
-  public abstract AbstractSimulator<?, ?> simulator();
+  public abstract AbstractSimulator<?, T> simulator();
 
   /** Return sub-sequence of results which are significant for model. */
-  Iterable<TimestampedValue<KnownSize>> relevantResults(
-      Iterable<TimestampedValue<KnownSize>> results) {
+  Iterable<TimestampedValue<T>> relevantResults(Iterable<TimestampedValue<T>> 
results) {
     return results;
   }
 
@@ -86,17 +85,17 @@ static Instant windowStart(Duration size, Duration period, 
Instant timestamp) {
    * Convert iterator of elements to collection of strings to use when testing 
coherence of model
    * against actual query results.
    */
-  protected abstract <T> Collection<String> 
toCollection(Iterator<TimestampedValue<T>> itr);
+  protected abstract Collection<String> 
toCollection(Iterator<TimestampedValue<T>> itr);
 
   /** Return assertion to use on results of pipeline for this query. */
-  public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> 
assertionFor() {
+  public SerializableFunction<Iterable<TimestampedValue<T>>, Void> 
assertionFor() {
     final Collection<String> expectedStrings = 
toCollection(simulator().results());
     Assert.assertFalse(expectedStrings.isEmpty());
 
-    return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, 
Void>() {
+    return new SerializableFunction<Iterable<TimestampedValue<T>>, Void>() {
       @Override
       @Nullable
-      public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
+      public Void apply(Iterable<TimestampedValue<T>> actual) {
         Collection<String> actualStrings = 
toCollection(relevantResults(actual).iterator());
         Assert.assertThat("wrong pipeline output", actualStrings, 
IsEqual.equalTo(expectedStrings));
         return null;
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryTransform.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryTransform.java
new file mode 100644
index 00000000000..7f5be475c40
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryTransform.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Base class for 'NEXMark' query transforms.
+ *
+ * <p>A query transform maps an event stream to some known size element stream.
+ *
+ * <p>A query transform may request a faked side input to join with ids.
+ */
+public abstract class NexmarkQueryTransform<T extends KnownSize>
+    extends PTransform<PCollection<Event>, PCollection<T>> {
+
+  private transient PCollection<KV<Long, String>> sideInput = null;
+
+  protected NexmarkQueryTransform(String name) {
+    super(name);
+  }
+
+  /** Whether this query expects a side input to be populated. Defaults to 
{@code false}. */
+  public boolean needsSideInput() {
+    return false;
+  }
+
+  /**
+   * Set the side input for the query.
+   *
+   * <p>Note that due to the nature of side inputs, this instance of the query 
is now fixed and can
+   * only be safely applied in the pipeline where the side input was created.
+   */
+  public void setSideInput(PCollection<KV<Long, String>> sideInput) {
+    this.sideInput = sideInput;
+  }
+
+  /** Get the side input, if any. */
+  public @Nullable PCollection<KV<Long, String>> getSideInput() {
+    return sideInput;
+  }
+}
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryUtil.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryUtil.java
new file mode 100644
index 00000000000..d700de4018f
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryUtil.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/** Utilities for working with NEXmark data stream. */
+public class NexmarkQueryUtil {
+  // Do not instantiate
+  private NexmarkQueryUtil() {}
+
+  public static final TupleTag<Auction> AUCTION_TAG = new 
TupleTag<>("auctions");
+  public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids");
+  public static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
+
+  /** Predicate to detect a new person event. */
+  public static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
+      event -> event.newPerson != null;
+
+  /** DoFn to convert a new person event to a person. */
+  public static final DoFn<Event, Person> AS_PERSON =
+      new DoFn<Event, Person>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().newPerson);
+        }
+      };
+
+  /** Predicate to detect a new auction event. */
+  public static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
+      event -> event.newAuction != null;
+
+  /** DoFn to convert a new auction event to an auction. */
+  public static final DoFn<Event, Auction> AS_AUCTION =
+      new DoFn<Event, Auction>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().newAuction);
+        }
+      };
+
+  /** Predicate to detect a new bid event. */
+  public static final SerializableFunction<Event, Boolean> IS_BID = event -> 
event.bid != null;
+
+  /** DoFn to convert a bid event to a bid. */
+  public static final DoFn<Event, Bid> AS_BID =
+      new DoFn<Event, Bid>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+          c.output(c.element().bid);
+        }
+      };
+
+  /** Transform to key each person by their id. */
+  public static final ParDo.SingleOutput<Person, KV<Long, Person>> 
PERSON_BY_ID =
+      ParDo.of(
+          new DoFn<Person, KV<Long, Person>>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              c.output(KV.of(c.element().id, c.element()));
+            }
+          });
+
+  /** Transform to key each auction by its id. */
+  public static final ParDo.SingleOutput<Auction, KV<Long, Auction>> 
AUCTION_BY_ID =
+      ParDo.of(
+          new DoFn<Auction, KV<Long, Auction>>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              c.output(KV.of(c.element().id, c.element()));
+            }
+          });
+
+  /** Transform to key each auction by its seller id. */
+  public static final ParDo.SingleOutput<Auction, KV<Long, Auction>> 
AUCTION_BY_SELLER =
+      ParDo.of(
+          new DoFn<Auction, KV<Long, Auction>>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              c.output(KV.of(c.element().seller, c.element()));
+            }
+          });
+
+  /** Transform to key each bid by it's auction id. */
+  public static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
+      ParDo.of(
+          new DoFn<Bid, KV<Long, Bid>>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              c.output(KV.of(c.element().auction, c.element()));
+            }
+          });
+
+  /** Transform to project the auction id from each bid. */
+  public static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
+      ParDo.of(
+          new DoFn<Bid, Long>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              c.output(c.element().auction);
+            }
+          });
+
+  /** Transform to project the price from each bid. */
+  public static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
+      ParDo.of(
+          new DoFn<Bid, Long>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              c.output(c.element().price);
+            }
+          });
+
+  /** Transform to emit each event with the timestamp embedded within it. */
+  public static final ParDo.SingleOutput<Event, Event> 
EVENT_TIMESTAMP_FROM_DATA =
+      ParDo.of(
+          new DoFn<Event, Event>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              Event e = c.element();
+              if (e.bid != null) {
+                c.outputWithTimestamp(e, new Instant(e.bid.dateTime));
+              } else if (e.newPerson != null) {
+                c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime));
+              } else if (e.newAuction != null) {
+                c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime));
+              }
+            }
+          });
+
+  /** Transform to filter for just the new auction events. */
+  public static final PTransform<PCollection<Event>, PCollection<Auction>> 
JUST_NEW_AUCTIONS =
+      new PTransform<PCollection<Event>, 
PCollection<Auction>>("justNewAuctions") {
+        @Override
+        public PCollection<Auction> expand(PCollection<Event> input) {
+          return input
+              .apply("IsNewAuction", Filter.by(IS_NEW_AUCTION))
+              .apply("AsAuction", ParDo.of(AS_AUCTION));
+        }
+      };
+
+  /** Transform to filter for just the new person events. */
+  public static final PTransform<PCollection<Event>, PCollection<Person>> 
JUST_NEW_PERSONS =
+      new PTransform<PCollection<Event>, 
PCollection<Person>>("justNewPersons") {
+        @Override
+        public PCollection<Person> expand(PCollection<Event> input) {
+          return input
+              .apply("IsNewPerson", Filter.by(IS_NEW_PERSON))
+              .apply("AsPerson", ParDo.of(AS_PERSON));
+        }
+      };
+
+  /** Transform to filter for just the bid events. */
+  public static final PTransform<PCollection<Event>, PCollection<Bid>> 
JUST_BIDS =
+      new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") {
+        @Override
+        public PCollection<Bid> expand(PCollection<Event> input) {
+          return input.apply("IsBid", Filter.by(IS_BID)).apply("AsBid", 
ParDo.of(AS_BID));
+        }
+      };
+}
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java
index 4dfcbfb3358..b4d3db28f08 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java
@@ -24,10 +24,7 @@
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
@@ -36,12 +33,13 @@
  * Query 0: Pass events through unchanged. However, force them to do a round 
trip through
  * serialization so that we measure the impact of the choice of coders.
  */
-public class Query0 extends NexmarkQuery {
-  public Query0(NexmarkConfiguration configuration) {
-    super(configuration, "Query0");
+public class Query0 extends NexmarkQueryTransform<Event> {
+  public Query0() {
+    super("Query0");
   }
 
-  private PCollection<Event> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<Event> expand(PCollection<Event> events) {
     final Coder<Event> coder = events.getCoder();
     return events
         // Force round trip through coder.
@@ -63,9 +61,4 @@ public void processElement(ProcessContext c) throws 
CoderException, IOException
               }
             }));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
index 1f4251b1b74..94c56f5b81b 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
@@ -25,7 +25,7 @@
 import org.apache.beam.sdk.values.TimestampedValue;
 
 /** A direct implementation of {@link Query0}. */
-public class Query0Model extends NexmarkQueryModel {
+public class Query0Model extends NexmarkQueryModel<Event> {
   /** Simulator for query 0. */
   private static class Simulator extends AbstractSimulator<Event, Event> {
     public Simulator(NexmarkConfiguration configuration) {
@@ -48,12 +48,12 @@ public Query0Model(NexmarkConfiguration configuration) {
   }
 
   @Override
-  public AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, Event> simulator() {
     return new Simulator(configuration);
   }
 
   @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+  protected Collection<String> toCollection(Iterator<TimestampedValue<Event>> 
itr) {
     return toValueTimestamp(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java
index ea752cbc509..e9b8c1004c0 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java
@@ -18,10 +18,8 @@
 package org.apache.beam.sdk.nexmark.queries;
 
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
@@ -37,15 +35,16 @@
  * <p>To make things more interesting, allow the 'currency conversion' to be 
arbitrarily slowed
  * down.
  */
-public class Query1 extends NexmarkQuery {
+public class Query1 extends NexmarkQueryTransform<Bid> {
   public Query1(NexmarkConfiguration configuration) {
-    super(configuration, "Query1");
+    super("Query1");
   }
 
-  private PCollection<Bid> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<Bid> expand(PCollection<Event> events) {
     return events
         // Only want the bid events.
-        .apply(JUST_BIDS)
+        .apply(NexmarkQueryUtil.JUST_BIDS)
 
         // Map the conversion function over all bids.
         .apply(
@@ -65,9 +64,4 @@ public void processElement(ProcessContext c) {
                   }
                 }));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
index 24d21afc3c9..a4ded8d77ce 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
@@ -30,10 +30,8 @@
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Done;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -60,10 +58,11 @@
  *
  * <p>Every windowSizeSec, save all events from the last period into 
2*maxWorkers log files.
  */
-public class Query10 extends NexmarkQuery {
+public class Query10 extends NexmarkQueryTransform<Done> {
   private static final Logger LOG = LoggerFactory.getLogger(Query10.class);
   private static final int NUM_SHARDS_PER_WORKER = 5;
   private static final Duration LATE_BATCHING_PERIOD = 
Duration.standardSeconds(10);
+  private final NexmarkConfiguration configuration;
 
   /** Capture everything we need to know about the records in a single output 
file. */
   private static class OutputFile implements Serializable {
@@ -104,7 +103,8 @@ public String toString() {
   private int maxNumWorkers;
 
   public Query10(NexmarkConfiguration configuration) {
-    super(configuration, "Query10");
+    super("Query10");
+    this.configuration = configuration;
   }
 
   public void setOutputPath(@Nullable String outputPath) {
@@ -169,7 +169,8 @@ private String indexPathFor(BoundedWindow window) {
     return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp());
   }
 
-  private PCollection<Done> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<Done> expand(PCollection<Event> events) {
     final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;
 
     return events
@@ -368,9 +369,4 @@ public void processElement(ProcessContext c, BoundedWindow 
window)
                   }
                 }));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java
index e04c34a9947..999955dd21a 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java
@@ -18,11 +18,9 @@
 package org.apache.beam.sdk.nexmark.queries;
 
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.BidsPerSession;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -40,15 +38,19 @@
  * <p>Group bids by the same user into sessions with {@code windowSizeSec} max 
gap. However limit
  * the session to at most {@code maxLogEvents}. Emit the number of bids per 
session.
  */
-public class Query11 extends NexmarkQuery {
+public class Query11 extends NexmarkQueryTransform<BidsPerSession> {
+  private final NexmarkConfiguration configuration;
+
   public Query11(NexmarkConfiguration configuration) {
-    super(configuration, "Query11");
+    super("Query11");
+    this.configuration = configuration;
   }
 
-  private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<BidsPerSession> expand(PCollection<Event> events) {
     PCollection<Long> bidders =
         events
-            .apply(JUST_BIDS)
+            .apply(NexmarkQueryUtil.JUST_BIDS)
             .apply(
                 name + ".Rekey",
                 ParDo.of(
@@ -83,9 +85,4 @@ public void processElement(ProcessContext c) {
                   }
                 }));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java
index 89656fe6134..64bdb72d63f 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java
@@ -18,11 +18,9 @@
 package org.apache.beam.sdk.nexmark.queries;
 
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.BidsPerSession;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -40,14 +38,18 @@
  * <p>Group bids by the same user into processing time windows of windowSize. 
Emit the count of bids
  * per window.
  */
-public class Query12 extends NexmarkQuery {
+public class Query12 extends NexmarkQueryTransform<BidsPerSession> {
+  private final NexmarkConfiguration configuration;
+
   public Query12(NexmarkConfiguration configuration) {
-    super(configuration, "Query12");
+    super("Query12");
+    this.configuration = configuration;
   }
 
-  private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<BidsPerSession> expand(PCollection<Event> events) {
     return events
-        .apply(JUST_BIDS)
+        .apply(NexmarkQueryUtil.JUST_BIDS)
         .apply(
             ParDo.of(
                 new DoFn<Bid, Long>() {
@@ -75,9 +77,4 @@ public void processElement(ProcessContext c) {
                   }
                 }));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
index 9097fa289cc..91deccf7613 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
@@ -27,7 +27,7 @@
 import org.apache.beam.sdk.values.TimestampedValue;
 
 /** A direct implementation of {@link Query1}. */
-public class Query1Model extends NexmarkQueryModel implements Serializable {
+public class Query1Model extends NexmarkQueryModel<Bid> implements 
Serializable {
   /** Simulator for query 1. */
   private static class Simulator extends AbstractSimulator<Event, Bid> {
     public Simulator(NexmarkConfiguration configuration) {
@@ -60,12 +60,12 @@ public Query1Model(NexmarkConfiguration configuration) {
   }
 
   @Override
-  public AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, Bid> simulator() {
     return new Simulator(configuration);
   }
 
   @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+  protected Collection<String> toCollection(Iterator<TimestampedValue<Bid>> 
itr) {
     return toValueTimestamp(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java
index 44726265752..1064c0d42f0 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java
@@ -18,11 +18,9 @@
 package org.apache.beam.sdk.nexmark.queries;
 
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.AuctionPrice;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -41,18 +39,22 @@
  * size. To make it more interesting we instead choose bids for every {@code 
auctionSkip}'th
  * auction.
  */
-public class Query2 extends NexmarkQuery {
+public class Query2 extends NexmarkQueryTransform<AuctionPrice> {
+  private final int auctionSkip;
+
   public Query2(NexmarkConfiguration configuration) {
-    super(configuration, "Query2");
+    super("Query2");
+    this.auctionSkip = configuration.auctionSkip;
   }
 
-  private PCollection<AuctionPrice> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<AuctionPrice> expand(PCollection<Event> events) {
     return events
         // Only want the bid events.
-        .apply(JUST_BIDS)
+        .apply(NexmarkQueryUtil.JUST_BIDS)
 
         // Select just the bids for the auctions we care about.
-        .apply(Filter.by(bid -> bid.auction % configuration.auctionSkip == 0))
+        .apply(Filter.by(bid -> bid.auction % this.auctionSkip == 0))
 
         // Project just auction id and price.
         .apply(
@@ -66,9 +68,4 @@ public void processElement(ProcessContext c) {
                   }
                 }));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
index 0a328b92381..695be144a47 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
@@ -28,7 +28,7 @@
 import org.apache.beam.sdk.values.TimestampedValue;
 
 /** A direct implementation of {@link Query2}. */
-public class Query2Model extends NexmarkQueryModel implements Serializable {
+public class Query2Model extends NexmarkQueryModel<AuctionPrice> implements 
Serializable {
   /** Simulator for query 2. */
   private class Simulator extends AbstractSimulator<Event, AuctionPrice> {
     public Simulator(NexmarkConfiguration configuration) {
@@ -64,12 +64,12 @@ public Query2Model(NexmarkConfiguration configuration) {
   }
 
   @Override
-  public AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, AuctionPrice> simulator() {
     return new Simulator(configuration);
   }
 
   @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+  protected Collection<String> 
toCollection(Iterator<TimestampedValue<AuctionPrice>> itr) {
     return toValueTimestamp(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
index e250a0fd564..f353087b446 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
@@ -23,10 +23,8 @@
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Auction;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.nexmark.model.NameCityStateId;
 import org.apache.beam.sdk.nexmark.model.Person;
 import org.apache.beam.sdk.state.StateSpec;
@@ -70,17 +68,18 @@
  *
  * <p>A real system would use an external system to maintain the id-to-person 
association.
  */
-public class Query3 extends NexmarkQuery {
+public class Query3 extends NexmarkQueryTransform<NameCityStateId> {
 
   private static final Logger LOG = LoggerFactory.getLogger(Query3.class);
   private final JoinDoFn joinDoFn;
 
   public Query3(NexmarkConfiguration configuration) {
-    super(configuration, "Query3");
+    super("Query3");
     joinDoFn = new JoinDoFn(name, configuration.maxAuctionsWaitingTime);
   }
 
-  private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<NameCityStateId> expand(PCollection<Event> events) {
     int numEventsInPane = 30;
 
     PCollection<Event> eventsWindowed =
@@ -92,18 +91,18 @@ public Query3(NexmarkConfiguration configuration) {
     PCollection<KV<Long, Auction>> auctionsBySellerId =
         eventsWindowed
             // Only want the new auction events.
-            .apply(JUST_NEW_AUCTIONS)
+            .apply(NexmarkQueryUtil.JUST_NEW_AUCTIONS)
 
             // We only want auctions in category 10.
             .apply(name + ".InCategory", Filter.by(auction -> auction.category 
== 10))
 
             // Key auctions by their seller id.
-            .apply("AuctionBySeller", AUCTION_BY_SELLER);
+            .apply("AuctionBySeller", NexmarkQueryUtil.AUCTION_BY_SELLER);
 
     PCollection<KV<Long, Person>> personsById =
         eventsWindowed
             // Only want the new people events.
-            .apply(JUST_NEW_PERSONS)
+            .apply(NexmarkQueryUtil.JUST_NEW_PERSONS)
 
             // We only want people in OR, ID, CA.
             .apply(
@@ -115,13 +114,13 @@ public Query3(NexmarkConfiguration configuration) {
                             || "CA".equals(person.state)))
 
             // Key people by their id.
-            .apply("PersonById", PERSON_BY_ID);
+            .apply("PersonById", NexmarkQueryUtil.PERSON_BY_ID);
 
     return
     // Join auctions and people.
     // concatenate KeyedPCollections
-    KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId)
-        .and(PERSON_TAG, personsById)
+    KeyedPCollectionTuple.of(NexmarkQueryUtil.AUCTION_TAG, auctionsBySellerId)
+        .and(NexmarkQueryUtil.PERSON_TAG, personsById)
         // group auctions and persons by personId
         .apply(CoGroupByKey.create())
         .apply(name + ".Join", ParDo.of(joinDoFn))
@@ -142,11 +141,6 @@ public void processElement(ProcessContext c) {
                 }));
   }
 
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-
   /**
    * Join {@code auctions} and {@code people} by person id and emit their 
cross-product one pair at
    * a time.
@@ -213,7 +207,7 @@ public void processElement(
         // We've already seen the new person event for this person id.
         // We can join with any new auctions on-the-fly without needing any
         // additional persistent state.
-        for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
+        for (Auction newAuction : 
c.element().getValue().getAll(NexmarkQueryUtil.AUCTION_TAG)) {
           newAuctionCounter.inc();
           newOldOutputCounter.inc();
           c.output(KV.of(newAuction, existingPerson));
@@ -222,7 +216,7 @@ public void processElement(
       }
 
       Person theNewPerson = null;
-      for (Person newPerson : c.element().getValue().getAll(PERSON_TAG)) {
+      for (Person newPerson : 
c.element().getValue().getAll(NexmarkQueryUtil.PERSON_TAG)) {
         if (theNewPerson == null) {
           theNewPerson = newPerson;
         } else {
@@ -246,7 +240,7 @@ public void processElement(
           auctionsState.clear();
         }
         // Also deal with any new auctions.
-        for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
+        for (Auction newAuction : 
c.element().getValue().getAll(NexmarkQueryUtil.AUCTION_TAG)) {
           newAuctionCounter.inc();
           newNewOutputCounter.inc();
           c.output(KV.of(newAuction, newPerson));
@@ -268,7 +262,7 @@ public void processElement(
       if (pendingAuctions == null) {
         pendingAuctions = new ArrayList<>();
       }
-      for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
+      for (Auction newAuction : 
c.element().getValue().getAll(NexmarkQueryUtil.AUCTION_TAG)) {
         newAuctionCounter.inc();
         pendingAuctions.add(newAuction);
       }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java
index ec2e615ed7d..522ffb63cf9 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java
@@ -34,7 +34,7 @@
 import org.joda.time.Instant;
 
 /** A direct implementation of {@link Query3}. */
-public class Query3Model extends NexmarkQueryModel implements Serializable {
+public class Query3Model extends NexmarkQueryModel<NameCityStateId> implements 
Serializable {
   /** Simulator for query 3. */
   private static class Simulator extends AbstractSimulator<Event, 
NameCityStateId> {
     /** Auctions, indexed by seller id. */
@@ -107,12 +107,12 @@ public Query3Model(NexmarkConfiguration configuration) {
   }
 
   @Override
-  public AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, NameCityStateId> simulator() {
     return new Simulator(configuration);
   }
 
   @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+  protected Collection<String> 
toCollection(Iterator<TimestampedValue<NameCityStateId>> itr) {
     return toValue(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java
index d3b1e233b09..60ba305693d 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java
@@ -19,13 +19,11 @@
 
 import org.apache.beam.sdk.nexmark.Monitor;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Auction;
 import org.apache.beam.sdk.nexmark.model.AuctionBid;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.CategoryPrice;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.Mean;
@@ -61,15 +59,18 @@
  *       {@code windowPeriodSec}.
  * </ul>
  */
-public class Query4 extends NexmarkQuery {
+public class Query4 extends NexmarkQueryTransform<CategoryPrice> {
   private final Monitor<AuctionBid> winningBidsMonitor;
+  private final NexmarkConfiguration configuration;
 
   public Query4(NexmarkConfiguration configuration) {
-    super(configuration, "Query4");
+    super("Query4");
+    this.configuration = configuration;
     winningBidsMonitor = new Monitor<>(name + ".WinningBids", "winning");
   }
 
-  private PCollection<CategoryPrice> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<CategoryPrice> expand(PCollection<Event> events) {
     PCollection<AuctionBid> winningBids =
         events
             .apply(Filter.by(new AuctionOrBid()))
@@ -119,9 +120,4 @@ public void processElement(ProcessContext c) {
                   }
                 }));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java
index d58f6ff5028..b8f5ff318ad 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java
@@ -30,7 +30,6 @@
 import org.apache.beam.sdk.nexmark.model.AuctionBid;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.CategoryPrice;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
@@ -38,7 +37,7 @@
 import org.junit.Assert;
 
 /** A direct implementation of {@link Query4}. */
-public class Query4Model extends NexmarkQueryModel implements Serializable {
+public class Query4Model extends NexmarkQueryModel<CategoryPrice> implements 
Serializable {
   /** Simulator for query 4. */
   private class Simulator extends AbstractSimulator<AuctionBid, CategoryPrice> 
{
     /** The prices and categories for all winning bids in the last window 
size. */
@@ -155,22 +154,21 @@ public Query4Model(NexmarkConfiguration configuration) {
   }
 
   @Override
-  public AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, CategoryPrice> simulator() {
     return new Simulator(configuration);
   }
 
   @Override
-  protected Iterable<TimestampedValue<KnownSize>> relevantResults(
-      Iterable<TimestampedValue<KnownSize>> results) {
+  protected Iterable<TimestampedValue<CategoryPrice>> relevantResults(
+      Iterable<TimestampedValue<CategoryPrice>> results) {
     // Find the last (in processing time) reported average price for each 
category.
-    Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>();
-    for (TimestampedValue<KnownSize> obj : results) {
+    Map<Long, TimestampedValue<CategoryPrice>> finalAverages = new TreeMap<>();
+    for (TimestampedValue<CategoryPrice> obj : results) {
       Assert.assertTrue("have CategoryPrice", obj.getValue() instanceof 
CategoryPrice);
       CategoryPrice categoryPrice = (CategoryPrice) obj.getValue();
       if (categoryPrice.isLast) {
         finalAverages.put(
-            categoryPrice.category,
-            TimestampedValue.of((KnownSize) categoryPrice, 
obj.getTimestamp()));
+            categoryPrice.category, TimestampedValue.of(categoryPrice, 
obj.getTimestamp()));
       }
     }
 
@@ -178,7 +176,7 @@ public Query4Model(NexmarkConfiguration configuration) {
   }
 
   @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+  protected Collection<String> 
toCollection(Iterator<TimestampedValue<CategoryPrice>> itr) {
     return toValue(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
index 7df64fef629..76d9e64d324 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
@@ -21,10 +21,8 @@
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.AuctionCount;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -52,22 +50,26 @@
  * <p>To make things a bit more dynamic and easier to test we use much shorter 
windows, and we'll
  * also preserve the bid counts.
  */
-public class Query5 extends NexmarkQuery {
+public class Query5 extends NexmarkQueryTransform<AuctionCount> {
+  private final NexmarkConfiguration configuration;
+
   public Query5(NexmarkConfiguration configuration) {
-    super(configuration, "Query5");
+    super("Query5");
+    this.configuration = configuration;
   }
 
-  private PCollection<AuctionCount> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<AuctionCount> expand(PCollection<Event> events) {
     return events
         // Only want the bid events.
-        .apply(JUST_BIDS)
+        .apply(NexmarkQueryUtil.JUST_BIDS)
         // Window the bids into sliding windows.
         .apply(
             Window.into(
                 
SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
                     
.every(Duration.standardSeconds(configuration.windowPeriodSec))))
         // Project just the auction id.
-        .apply("BidToAuction", BID_TO_AUCTION)
+        .apply("BidToAuction", NexmarkQueryUtil.BID_TO_AUCTION)
 
         // Count the number of bids per auction id.
         .apply(Count.perElement())
@@ -129,9 +131,4 @@ public void processElement(ProcessContext c) {
                   }
                 }));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java
index ee1a5dfad65..2ae08af4649 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java
@@ -34,7 +34,7 @@
 import org.joda.time.Instant;
 
 /** A direct implementation of {@link Query5}. */
-public class Query5Model extends NexmarkQueryModel implements Serializable {
+public class Query5Model extends NexmarkQueryModel<AuctionCount> implements 
Serializable {
   /** Simulator for query 5. */
   private class Simulator extends AbstractSimulator<Event, AuctionCount> {
     /** Time of bids still contributing to open windows, indexed by their 
auction id. */
@@ -156,12 +156,12 @@ public Query5Model(NexmarkConfiguration configuration) {
   }
 
   @Override
-  public AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, AuctionCount> simulator() {
     return new Simulator(configuration);
   }
 
   @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+  protected Collection<String> 
toCollection(Iterator<TimestampedValue<AuctionCount>> itr) {
     return toValue(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java
index eeae79acc92..7a6ec33f79d 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java
@@ -21,12 +21,10 @@
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Auction;
 import org.apache.beam.sdk.nexmark.model.AuctionBid;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.nexmark.model.SellerPrice;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -55,7 +53,15 @@
  *
  * <p>We are a little more exact with selecting winning bids: see {@link 
WinningBids}.
  */
-public class Query6 extends NexmarkQuery {
+public class Query6 extends NexmarkQueryTransform<SellerPrice> {
+
+  private final NexmarkConfiguration configuration;
+
+  public Query6(NexmarkConfiguration configuration) {
+    super("Query6");
+    this.configuration = configuration;
+  }
+
   /**
    * Combiner to keep track of up to {@code maxNumBids} of the most recent 
wining bids and calculate
    * their average selling price.
@@ -108,11 +114,8 @@ public Long extractOutput(List<Bid> accumulator) {
     }
   }
 
-  public Query6(NexmarkConfiguration configuration) {
-    super(configuration, "Query6");
-  }
-
-  private PCollection<SellerPrice> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<SellerPrice> expand(PCollection<Event> events) {
     return events
         .apply(Filter.by(new AuctionOrBid()))
         // Find the winning bid for each closed auction.
@@ -152,9 +155,4 @@ public void processElement(ProcessContext c) {
                   }
                 }));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
index c6f74e40357..9f37301683a 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
@@ -29,7 +29,6 @@
 import org.apache.beam.sdk.nexmark.model.Auction;
 import org.apache.beam.sdk.nexmark.model.AuctionBid;
 import org.apache.beam.sdk.nexmark.model.Bid;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.nexmark.model.SellerPrice;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -37,7 +36,7 @@
 import org.junit.Assert;
 
 /** A direct implementation of {@link Query6}. */
-public class Query6Model extends NexmarkQueryModel implements Serializable {
+public class Query6Model extends NexmarkQueryModel<SellerPrice> implements 
Serializable {
   /** Simulator for query 6. */
   private static class Simulator extends AbstractSimulator<AuctionBid, 
SellerPrice> {
     /** The last 10 winning bids ordered by age, indexed by seller id. */
@@ -111,26 +110,25 @@ public Query6Model(NexmarkConfiguration configuration) {
   }
 
   @Override
-  public AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, SellerPrice> simulator() {
     return new Simulator(configuration);
   }
 
   @Override
-  protected Iterable<TimestampedValue<KnownSize>> relevantResults(
-      Iterable<TimestampedValue<KnownSize>> results) {
+  protected Iterable<TimestampedValue<SellerPrice>> relevantResults(
+      Iterable<TimestampedValue<SellerPrice>> results) {
     // Find the last (in processing time) reported average price for each 
seller.
-    Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>();
-    for (TimestampedValue<KnownSize> obj : results) {
+    Map<Long, TimestampedValue<SellerPrice>> finalAverages = new TreeMap<>();
+    for (TimestampedValue<SellerPrice> obj : results) {
       Assert.assertTrue("have SellerPrice", obj.getValue() instanceof 
SellerPrice);
       SellerPrice sellerPrice = (SellerPrice) obj.getValue();
-      finalAverages.put(
-          sellerPrice.seller, TimestampedValue.of((KnownSize) sellerPrice, 
obj.getTimestamp()));
+      finalAverages.put(sellerPrice.seller, TimestampedValue.of(sellerPrice, 
obj.getTimestamp()));
     }
     return finalAverages.values();
   }
 
   @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+  protected Collection<String> 
toCollection(Iterator<TimestampedValue<SellerPrice>> itr) {
     return toValue(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java
index 44bd672c789..cf372f97644 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java
@@ -18,10 +18,8 @@
 package org.apache.beam.sdk.nexmark.queries;
 
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -46,16 +44,20 @@
  * side-input in order to exercise that functionality. (A combiner, as used in 
Query 5, is a more
  * efficient approach.).
  */
-public class Query7 extends NexmarkQuery {
+public class Query7 extends NexmarkQueryTransform<Bid> {
+  private final NexmarkConfiguration configuration;
+
   public Query7(NexmarkConfiguration configuration) {
-    super(configuration, "Query7");
+    super("Query7");
+    this.configuration = configuration;
   }
 
-  private PCollection<Bid> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<Bid> expand(PCollection<Event> events) {
     // Window the bids.
     PCollection<Bid> slidingBids =
         events
-            .apply(JUST_BIDS)
+            .apply(NexmarkQueryUtil.JUST_BIDS)
             .apply(
                 Window.into(
                     
FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))));
@@ -67,7 +69,7 @@ public Query7(NexmarkConfiguration configuration) {
     // its I/O. We'll keep this implementation since it illustrates the use of 
side inputs.
     final PCollectionView<Long> maxPriceView =
         slidingBids
-            .apply("BidToPrice", BID_TO_PRICE)
+            .apply("BidToPrice", NexmarkQueryUtil.BID_TO_PRICE)
             
.apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView());
 
     return slidingBids
@@ -87,9 +89,4 @@ public void processElement(ProcessContext c) {
                 })
             .withSideInputs(maxPriceView));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
index 9ff026b08de..456003381ce 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
@@ -32,7 +32,7 @@
 import org.joda.time.Instant;
 
 /** A direct implementation of {@link Query7}. */
-public class Query7Model extends NexmarkQueryModel implements Serializable {
+public class Query7Model extends NexmarkQueryModel<Bid> implements 
Serializable {
   /** Simulator for query 7. */
   private class Simulator extends AbstractSimulator<Event, Bid> {
     /** Bids with highest bid price seen in the current window. */
@@ -113,12 +113,12 @@ public Query7Model(NexmarkConfiguration configuration) {
   }
 
   @Override
-  public AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, Bid> simulator() {
     return new Simulator(configuration);
   }
 
   @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+  protected Collection<String> toCollection(Iterator<TimestampedValue<Bid>> 
itr) {
     return toValue(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
index d78f2b58017..3443dbbc259 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
@@ -19,11 +19,9 @@
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.Auction;
 import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.nexmark.model.IdNameReserve;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.nexmark.model.Person;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -48,33 +46,37 @@
  *
  * <p>To make things a bit more dynamic and easier to test we'll use a much 
shorter window.
  */
-public class Query8 extends NexmarkQuery {
+public class Query8 extends NexmarkQueryTransform<IdNameReserve> {
+  private final NexmarkConfiguration configuration;
+
   public Query8(NexmarkConfiguration configuration) {
-    super(configuration, "Query8");
+    super("Query8");
+    this.configuration = configuration;
   }
 
-  private PCollection<IdNameReserve> applyTyped(PCollection<Event> events) {
+  @Override
+  public PCollection<IdNameReserve> expand(PCollection<Event> events) {
     // Window and key new people by their id.
     PCollection<KV<Long, Person>> personsById =
         events
-            .apply(JUST_NEW_PERSONS)
+            .apply(NexmarkQueryUtil.JUST_NEW_PERSONS)
             .apply(
                 "Query8.WindowPersons",
                 
Window.into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))))
-            .apply("PersonById", PERSON_BY_ID);
+            .apply("PersonById", NexmarkQueryUtil.PERSON_BY_ID);
 
     // Window and key new auctions by their id.
     PCollection<KV<Long, Auction>> auctionsBySeller =
         events
-            .apply(JUST_NEW_AUCTIONS)
+            .apply(NexmarkQueryUtil.JUST_NEW_AUCTIONS)
             .apply(
                 "Query8.WindowAuctions",
                 
Window.into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))))
-            .apply("AuctionBySeller", AUCTION_BY_SELLER);
+            .apply("AuctionBySeller", NexmarkQueryUtil.AUCTION_BY_SELLER);
 
     // Join people and auctions and project the person id, name and auction 
reserve price.
-    return KeyedPCollectionTuple.of(PERSON_TAG, personsById)
-        .and(AUCTION_TAG, auctionsBySeller)
+    return KeyedPCollectionTuple.of(NexmarkQueryUtil.PERSON_TAG, personsById)
+        .and(NexmarkQueryUtil.AUCTION_TAG, auctionsBySeller)
         .apply(CoGroupByKey.create())
         .apply(
             name + ".Select",
@@ -82,20 +84,18 @@ public Query8(NexmarkConfiguration configuration) {
                 new DoFn<KV<Long, CoGbkResult>, IdNameReserve>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
-                    @Nullable Person person = 
c.element().getValue().getOnly(PERSON_TAG, null);
+                    @Nullable
+                    Person person =
+                        
c.element().getValue().getOnly(NexmarkQueryUtil.PERSON_TAG, null);
                     if (person == null) {
                       // Person was not created in last window period.
                       return;
                     }
-                    for (Auction auction : 
c.element().getValue().getAll(AUCTION_TAG)) {
+                    for (Auction auction :
+                        
c.element().getValue().getAll(NexmarkQueryUtil.AUCTION_TAG)) {
                       c.output(new IdNameReserve(person.id, person.name, 
auction.reserve));
                     }
                   }
                 }));
   }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java
index 54788a7b22c..888ac4e1b4a 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java
@@ -35,7 +35,7 @@
 import org.joda.time.Instant;
 
 /** A direct implementation of {@link Query8}. */
-public class Query8Model extends NexmarkQueryModel implements Serializable {
+public class Query8Model extends NexmarkQueryModel<IdNameReserve> implements 
Serializable {
   /** Simulator for query 8. */
   private class Simulator extends AbstractSimulator<Event, IdNameReserve> {
     /** New persons seen in the current window, indexed by id. */
@@ -131,12 +131,12 @@ public Query8Model(NexmarkConfiguration configuration) {
   }
 
   @Override
-  public AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, IdNameReserve> simulator() {
     return new Simulator(configuration);
   }
 
   @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+  protected Collection<String> 
toCollection(Iterator<TimestampedValue<IdNameReserve>> itr) {
     return toValue(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
index 087a4a617cd..96fd858f0fd 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
@@ -18,10 +18,8 @@
 package org.apache.beam.sdk.nexmark.queries;
 
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
 import org.apache.beam.sdk.nexmark.model.AuctionBid;
 import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -29,17 +27,16 @@
  * Query "9", 'Winning bids'. Select just the winning bids. Not in original 
NEXMark suite, but handy
  * for testing. See {@link WinningBids} for the details.
  */
-public class Query9 extends NexmarkQuery {
-  public Query9(NexmarkConfiguration configuration) {
-    super(configuration, "Query9");
-  }
+public class Query9 extends NexmarkQueryTransform<AuctionBid> {
+  private final NexmarkConfiguration configuration;
 
-  private PCollection<AuctionBid> applyTyped(PCollection<Event> events) {
-    return events.apply(Filter.by(new AuctionOrBid())).apply(new 
WinningBids(name, configuration));
+  public Query9(NexmarkConfiguration configuration) {
+    super("Query9");
+    this.configuration = configuration;
   }
 
   @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  public PCollection<AuctionBid> expand(PCollection<Event> events) {
+    return events.apply(Filter.by(new AuctionOrBid())).apply(new 
WinningBids(name, configuration));
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java
index c4df008e99c..d2644bd66b3 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java
@@ -21,21 +21,22 @@
 import java.util.Collection;
 import java.util.Iterator;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.model.AuctionBid;
 import org.apache.beam.sdk.values.TimestampedValue;
 
 /** A direct implementation of {@link Query9}. */
-public class Query9Model extends NexmarkQueryModel implements Serializable {
+public class Query9Model extends NexmarkQueryModel<AuctionBid> implements 
Serializable {
   public Query9Model(NexmarkConfiguration configuration) {
     super(configuration);
   }
 
   @Override
-  public AbstractSimulator<?, ?> simulator() {
+  public AbstractSimulator<?, AuctionBid> simulator() {
     return new WinningBidsSimulator(configuration);
   }
 
   @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+  protected Collection<String> 
toCollection(Iterator<TimestampedValue<AuctionBid>> itr) {
     return toValue(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
index 847f417d636..5a9976b84bb 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
@@ -339,18 +339,20 @@ public WinningBids(String name, NexmarkConfiguration 
configuration) {
     // Key auctions by their id.
     PCollection<KV<Long, Auction>> auctionsById =
         events
-            .apply(NexmarkQuery.JUST_NEW_AUCTIONS)
-            .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID);
+            .apply(NexmarkQueryUtil.JUST_NEW_AUCTIONS)
+            .apply("AuctionById:", NexmarkQueryUtil.AUCTION_BY_ID);
 
     // Key bids by their auction id.
     PCollection<KV<Long, Bid>> bidsByAuctionId =
-        events.apply(NexmarkQuery.JUST_BIDS).apply("BidByAuction", 
NexmarkQuery.BID_BY_AUCTION);
+        events
+            .apply(NexmarkQueryUtil.JUST_BIDS)
+            .apply("BidByAuction", NexmarkQueryUtil.BID_BY_AUCTION);
 
     // Find the highest price valid bid for each closed auction.
     return
     // Join auctions and bids.
-    KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
-        .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
+    KeyedPCollectionTuple.of(NexmarkQueryUtil.AUCTION_TAG, auctionsById)
+        .and(NexmarkQueryUtil.BID_TAG, bidsByAuctionId)
         .apply(CoGroupByKey.create())
         // Filter and select.
         .apply(
@@ -365,7 +367,7 @@ public WinningBids(String name, NexmarkConfiguration 
configuration) {
                   public void processElement(ProcessContext c) {
                     @Nullable
                     Auction auction =
-                        
c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
+                        
c.element().getValue().getOnly(NexmarkQueryUtil.AUCTION_TAG, null);
                     if (auction == null) {
                       // We have bids without a matching auction. Give up.
                       noAuctionCounter.inc();
@@ -374,7 +376,7 @@ public void processElement(ProcessContext c) {
                     // Find the current winning bid for auction.
                     // The earliest bid with the maximum price above the 
reserve wins.
                     Bid bestBid = null;
-                    for (Bid bid : 
c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
+                    for (Bid bid : 
c.element().getValue().getAll(NexmarkQueryUtil.BID_TAG)) {
                       // Bids too late for their auction will have been
                       // filtered out by the window merge function.
                       checkState(bid.dateTime.compareTo(auction.expires) < 0);
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/NexmarkSqlQuery.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/NexmarkSqlQuery.java
deleted file mode 100644
index 18ba4aaa20e..00000000000
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/NexmarkSqlQuery.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.nexmark.queries.sql;
-
-import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
-import org.apache.beam.sdk.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.nexmark.model.Event;
-import org.apache.beam.sdk.nexmark.model.KnownSize;
-import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-
-/** Executor for Nexmark queries. Allows to decouple from NexmarkQuery and 
test independently. */
-public class NexmarkSqlQuery<T extends KnownSize> extends NexmarkQuery {
-
-  private PTransform<PCollection<Event>, PCollection<T>> queryTransform;
-
-  public NexmarkSqlQuery(
-      NexmarkConfiguration configuration,
-      PTransform<PCollection<Event>, PCollection<T>> queryTransform) {
-    super(configuration, queryTransform.getName());
-    this.queryTransform = queryTransform;
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    PCollection<? extends KnownSize> resultRecordsSizes = 
events.apply(queryTransform);
-
-    return NexmarkUtils.castToKnownSize(name, resultRecordsSizes);
-  }
-}
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java
index 4c11c96e521..361290c0ec0 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery0.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.nexmark.queries.sql;
 
-import static org.apache.beam.sdk.nexmark.queries.NexmarkQuery.IS_BID;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -30,6 +28,8 @@
 import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.nexmark.model.Event.Type;
 import org.apache.beam.sdk.nexmark.model.sql.SelectEvent;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryTransform;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryUtil;
 import org.apache.beam.sdk.schemas.transforms.Convert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
@@ -48,7 +48,7 @@
  * <p>{@link Bid} events are used here at the moment, ås they are most 
numerous with default
  * configuration.
  */
-public class SqlQuery0 extends PTransform<PCollection<Event>, 
PCollection<Bid>> {
+public class SqlQuery0 extends NexmarkQueryTransform<Bid> {
 
   private static final PTransform<PInput, PCollection<Row>> QUERY =
       SqlTransform.query("SELECT * FROM PCOLLECTION");
@@ -61,7 +61,7 @@ public SqlQuery0() {
   public PCollection<Bid> expand(PCollection<Event> allEvents) {
     PCollection<Row> rows =
         allEvents
-            .apply(Filter.by(IS_BID))
+            .apply(Filter.by(NexmarkQueryUtil.IS_BID))
             .apply(getName() + ".SelectEvent", new SelectEvent(Type.BID));
 
     return rows.apply(getName() + ".Serialize", 
logBytesMetric(rows.getCoder()))
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1.java
index 1a84a1654df..3b1054bc702 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery1.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.sdk.nexmark.queries.sql;
 
-import static org.apache.beam.sdk.nexmark.queries.NexmarkQuery.IS_BID;
-
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.nexmark.model.Event.Type;
 import org.apache.beam.sdk.nexmark.model.sql.SelectEvent;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryTransform;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryUtil;
 import org.apache.beam.sdk.schemas.transforms.Convert;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -43,7 +43,7 @@
  * <p>To make things more interesting, allow the 'currency conversion' to be 
arbitrarily slowed
  * down.
  */
-public class SqlQuery1 extends PTransform<PCollection<Event>, 
PCollection<Bid>> {
+public class SqlQuery1 extends NexmarkQueryTransform<Bid> {
 
   private static final PTransform<PInput, PCollection<Row>> QUERY =
       SqlTransform.query(
@@ -65,7 +65,7 @@ public SqlQuery1() {
   @Override
   public PCollection<Bid> expand(PCollection<Event> allEvents) {
     return allEvents
-        .apply(Filter.by(IS_BID))
+        .apply(Filter.by(NexmarkQueryUtil.IS_BID))
         .apply(getName() + ".SelectEvent", new SelectEvent(Type.BID))
         .apply(QUERY)
         .apply(Convert.fromRows(Bid.class));
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2.java
index 561aa13c73b..5b895024f71 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery2.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.sdk.nexmark.queries.sql;
 
-import static org.apache.beam.sdk.nexmark.queries.NexmarkQuery.IS_BID;
-
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.nexmark.model.AuctionPrice;
 import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.nexmark.model.Event.Type;
 import org.apache.beam.sdk.nexmark.model.sql.SelectEvent;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryTransform;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryUtil;
 import org.apache.beam.sdk.schemas.transforms.Convert;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -43,7 +43,7 @@
  * <p>As written that query will only yield a few hundred results over event 
streams of arbitrary
  * size. To make it more interesting we instead choose bids for every {@code 
skipFactor}'th auction.
  */
-public class SqlQuery2 extends PTransform<PCollection<Event>, 
PCollection<AuctionPrice>> {
+public class SqlQuery2 extends NexmarkQueryTransform<AuctionPrice> {
 
   private static final String QUERY_TEMPLATE =
       "SELECT auction, price FROM PCOLLECTION WHERE MOD(auction, %d) = 0";
@@ -60,7 +60,7 @@ public SqlQuery2(long skipFactor) {
   @Override
   public PCollection<AuctionPrice> expand(PCollection<Event> allEvents) {
     return allEvents
-        .apply(Filter.by(IS_BID))
+        .apply(Filter.by(NexmarkQueryUtil.IS_BID))
         .apply(getName() + ".SelectEvent", new SelectEvent(Type.BID))
         .apply(query)
         .apply(Convert.fromRows(AuctionPrice.class));
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3.java
index fbbefb6ab7e..7e09974e1b6 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery3.java
@@ -25,10 +25,10 @@
 import org.apache.beam.sdk.nexmark.model.NameCityStateId;
 import org.apache.beam.sdk.nexmark.model.Person;
 import org.apache.beam.sdk.nexmark.model.sql.SelectEvent;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryTransform;
 import org.apache.beam.sdk.nexmark.queries.Query3;
 import org.apache.beam.sdk.schemas.transforms.Convert;
 import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -66,7 +66,7 @@
  *
  * <p>Correct join semantics implementation is tracked in BEAM-3190, BEAM-3191
  */
-public class SqlQuery3 extends PTransform<PCollection<Event>, 
PCollection<NameCityStateId>> {
+public class SqlQuery3 extends NexmarkQueryTransform<NameCityStateId> {
 
   private static final String QUERY_NAME = SqlQuery3.class.getSimpleName();
 
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
index b3d9623da83..d6fd0f16edb 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.nexmark.queries.sql;
 
-import static org.apache.beam.sdk.nexmark.queries.NexmarkQuery.IS_BID;
-
 import com.google.common.base.Joiner;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
@@ -26,6 +24,8 @@
 import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.nexmark.model.Event.Type;
 import org.apache.beam.sdk.nexmark.model.sql.SelectEvent;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryTransform;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryUtil;
 import org.apache.beam.sdk.schemas.transforms.Convert;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -52,7 +52,7 @@
  * <p>To make things a bit more dynamic and easier to test we use much shorter 
windows, and we'll
  * also preserve the bid counts.
  */
-public class SqlQuery5 extends PTransform<PCollection<Event>, 
PCollection<AuctionCount>> {
+public class SqlQuery5 extends NexmarkQueryTransform<AuctionCount> {
 
   private static final String QUERY_TEMPLATE =
       Joiner.on("\n\t")
@@ -99,7 +99,7 @@ public SqlQuery5(NexmarkConfiguration configuration) {
   public PCollection<AuctionCount> expand(PCollection<Event> allEvents) {
     PCollection<Row> bids =
         allEvents
-            .apply(Filter.by(IS_BID))
+            .apply(Filter.by(NexmarkQueryUtil.IS_BID))
             .apply(getName() + ".SelectEvent", new SelectEvent(Type.BID));
 
     return PCollectionTuple.of(new TupleTag<>("Bid"), bids)
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7.java
index 0a446958e6e..f80c71e45cd 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery7.java
@@ -17,14 +17,14 @@
  */
 package org.apache.beam.sdk.nexmark.queries.sql;
 
-import static org.apache.beam.sdk.nexmark.queries.NexmarkQuery.IS_BID;
-
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
 import org.apache.beam.sdk.nexmark.model.Bid;
 import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.nexmark.model.Event.Type;
 import org.apache.beam.sdk.nexmark.model.sql.SelectEvent;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryTransform;
+import org.apache.beam.sdk.nexmark.queries.NexmarkQueryUtil;
 import org.apache.beam.sdk.schemas.transforms.Convert;
 import org.apache.beam.sdk.transforms.Filter;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -47,7 +47,7 @@
  *
  * <p>We will use a shorter window to help make testing easier.
  */
-public class SqlQuery7 extends PTransform<PCollection<Event>, 
PCollection<Bid>> {
+public class SqlQuery7 extends NexmarkQueryTransform<Bid> {
 
   private static final String QUERY_TEMPLATE =
       ""
@@ -76,7 +76,7 @@ public SqlQuery7(NexmarkConfiguration configuration) {
   public PCollection<Bid> expand(PCollection<Event> allEvents) {
     PCollection<Row> bids =
         allEvents
-            .apply(Filter.by(IS_BID))
+            .apply(Filter.by(NexmarkQueryUtil.IS_BID))
             .apply(getName() + ".SelectEvent", new SelectEvent(Type.BID));
 
     return PCollectionTuple.of(new TupleTag<>("Bid"), bids)
diff --git 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest.java
index da58d24707c..cf85106ca2b 100644
--- 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest.java
+++ 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest.java
@@ -59,11 +59,11 @@ public void setupPipeline() {
   }
 
   /** Test {@code query} matches {@code model}. */
-  private void queryMatchesModel(
+  private <T extends KnownSize> void queryMatchesModel(
       String name,
       NexmarkConfiguration config,
-      NexmarkQuery query,
-      NexmarkQueryModel model,
+      NexmarkQueryTransform<T> query,
+      NexmarkQueryModel<T> model,
       boolean streamingMode)
       throws Exception {
 
@@ -79,14 +79,15 @@ private void queryMatchesModel(
       PCollection<KV<Long, String>> sideInput = 
NexmarkUtils.prepareSideInput(p, config);
       query.setSideInput(sideInput);
 
-      PCollection<TimestampedValue<KnownSize>> results;
-      if (streamingMode) {
-        results =
-            p.apply(name + ".ReadUnBounded", 
NexmarkUtils.streamEventsSource(config)).apply(query);
-      } else {
-        results =
-            p.apply(name + ".ReadBounded", 
NexmarkUtils.batchEventsSource(config)).apply(query);
-      }
+      PCollection<Event> events =
+          p.apply(
+              name + ".Read",
+              streamingMode
+                  ? NexmarkUtils.streamEventsSource(config)
+                  : NexmarkUtils.batchEventsSource(config));
+
+      PCollection<TimestampedValue<T>> results =
+          (PCollection<TimestampedValue<T>>) events.apply(new 
NexmarkQuery<>(config, query));
       PAssert.that(results).satisfies(model.assertionFor());
       PipelineResult result = p.run();
       result.waitUntilFinish();
@@ -112,13 +113,14 @@ public void inputOutputSameEvents() throws Exception {
 
     try {
       PCollection<Event> input = 
p.apply(NexmarkUtils.batchEventsSource(config));
-      PCollection<Bid> justBids = input.apply(NexmarkQuery.JUST_BIDS);
+      PCollection<Bid> justBids = input.apply(NexmarkQueryUtil.JUST_BIDS);
       PCollection<Long> bidCount = justBids.apply("Count Bids", 
Count.globally());
 
-      NexmarkQuery query = new BoundedSideInputJoin(config);
+      NexmarkQueryTransform<Bid> query = new BoundedSideInputJoin(config);
       query.setSideInput(sideInput);
 
-      PCollection<TimestampedValue<KnownSize>> output = input.apply(query);
+      PCollection<TimestampedValue<Bid>> output =
+          (PCollection<TimestampedValue<Bid>>) input.apply(new 
NexmarkQuery(config, query));
       PCollection<Long> outputCount = output.apply("Count outputs", 
Count.globally());
 
       
PAssert.that(PCollectionList.of(bidCount).and(outputCount).apply(Flatten.pCollections()))
diff --git 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
index 23753fea32c..9dd6dc884f2 100644
--- 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
+++ 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java
@@ -20,8 +20,8 @@
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
 import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.nexmark.model.KnownSize;
-import org.apache.beam.sdk.nexmark.queries.sql.NexmarkSqlQuery;
 import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery1;
 import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery2;
 import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery3;
@@ -55,16 +55,21 @@
   @Rule public TestPipeline p = TestPipeline.create();
 
   /** Test {@code query} matches {@code model}. */
-  private void queryMatchesModel(
-      String name, NexmarkQuery query, NexmarkQueryModel model, boolean 
streamingMode) {
+  private <T extends KnownSize> void queryMatchesModel(
+      String name,
+      NexmarkQueryTransform<T> query,
+      NexmarkQueryModel<T> model,
+      boolean streamingMode) {
     NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
-    PCollection<TimestampedValue<KnownSize>> results;
-    if (streamingMode) {
-      results =
-          p.apply(name + ".ReadUnBounded", 
NexmarkUtils.streamEventsSource(CONFIG)).apply(query);
-    } else {
-      results = p.apply(name + ".ReadBounded", 
NexmarkUtils.batchEventsSource(CONFIG)).apply(query);
-    }
+
+    PCollection<Event> events =
+        p.apply(
+            name + ".Read",
+            streamingMode
+                ? NexmarkUtils.streamEventsSource(CONFIG)
+                : NexmarkUtils.batchEventsSource(CONFIG));
+    PCollection<TimestampedValue<T>> results =
+        (PCollection<TimestampedValue<T>>) events.apply(new 
NexmarkQuery<>(CONFIG, query));
     PAssert.that(results).satisfies(model.assertionFor());
     PipelineResult result = p.run();
     result.waitUntilFinish();
@@ -73,13 +78,13 @@ private void queryMatchesModel(
   @Test
   @Category(NeedsRunner.class)
   public void query0MatchesModelBatch() {
-    queryMatchesModel("Query0TestBatch", new Query0(CONFIG), new 
Query0Model(CONFIG), false);
+    queryMatchesModel("Query0TestBatch", new Query0(), new 
Query0Model(CONFIG), false);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void query0MatchesModelStreaming() {
-    queryMatchesModel("Query0TestStreaming", new Query0(CONFIG), new 
Query0Model(CONFIG), true);
+    queryMatchesModel("Query0TestStreaming", new Query0(), new 
Query0Model(CONFIG), true);
   }
 
   @Test
@@ -97,21 +102,13 @@ public void query1MatchesModelStreaming() {
   @Test
   @Category(NeedsRunner.class)
   public void sqlQuery1MatchesModelBatch() {
-    queryMatchesModel(
-        "SqlQuery1TestBatch",
-        new NexmarkSqlQuery(CONFIG, new SqlQuery1()),
-        new Query1Model(CONFIG),
-        false);
+    queryMatchesModel("SqlQuery1TestBatch", new SqlQuery1(), new 
Query1Model(CONFIG), false);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void sqlQuery1MatchesModelStreaming() {
-    queryMatchesModel(
-        "SqlQuery1TestStreaming",
-        new NexmarkSqlQuery(CONFIG, new SqlQuery1()),
-        new Query1Model(CONFIG),
-        true);
+    queryMatchesModel("SqlQuery1TestStreaming", new SqlQuery1(), new 
Query1Model(CONFIG), true);
   }
 
   @Test
@@ -130,20 +127,14 @@ public void query2MatchesModelStreaming() {
   @Category(NeedsRunner.class)
   public void sqlQuery2MatchesModelBatch() {
     queryMatchesModel(
-        "SqlQuery2TestBatch",
-        new NexmarkSqlQuery(CONFIG, new SqlQuery2(CONFIG.auctionSkip)),
-        new Query2Model(CONFIG),
-        false);
+        "SqlQuery2TestBatch", new SqlQuery2(CONFIG.auctionSkip), new 
Query2Model(CONFIG), false);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void sqlQuery2MatchesModelStreaming() {
     queryMatchesModel(
-        "SqlQuery2TestStreaming",
-        new NexmarkSqlQuery(CONFIG, new SqlQuery2(CONFIG.auctionSkip)),
-        new Query2Model(CONFIG),
-        true);
+        "SqlQuery2TestStreaming", new SqlQuery2(CONFIG.auctionSkip), new 
Query2Model(CONFIG), true);
   }
 
   @Test
@@ -161,21 +152,14 @@ public void query3MatchesModelStreaming() {
   @Test
   @Category({NeedsRunner.class, UsesStatefulParDo.class, 
UsesTimersInParDo.class})
   public void sqlQuery3MatchesModelBatch() {
-    queryMatchesModel(
-        "SqlQuery3TestBatch",
-        new NexmarkSqlQuery(CONFIG, new SqlQuery3(CONFIG)),
-        new Query3Model(CONFIG),
-        false);
+    queryMatchesModel("SqlQuery3TestBatch", new SqlQuery3(CONFIG), new 
Query3Model(CONFIG), false);
   }
 
   @Test
   @Category({NeedsRunner.class, UsesStatefulParDo.class, 
UsesTimersInParDo.class})
   public void sqlQuery3MatchesModelStreaming() {
     queryMatchesModel(
-        "SqlQuery3TestStreaming",
-        new NexmarkSqlQuery(CONFIG, new SqlQuery3(CONFIG)),
-        new Query3Model(CONFIG),
-        true);
+        "SqlQuery3TestStreaming", new SqlQuery3(CONFIG), new 
Query3Model(CONFIG), true);
   }
 
   @Test
@@ -205,21 +189,14 @@ public void query5MatchesModelStreaming() {
   @Test
   @Category(NeedsRunner.class)
   public void sqlQuery5MatchesModelBatch() {
-    queryMatchesModel(
-        "SqlQuery5TestBatch",
-        new NexmarkSqlQuery(CONFIG, new SqlQuery5(CONFIG)),
-        new Query5Model(CONFIG),
-        false);
+    queryMatchesModel("SqlQuery5TestBatch", new SqlQuery5(CONFIG), new 
Query5Model(CONFIG), false);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void sqlQuery5MatchesModelStreaming() {
     queryMatchesModel(
-        "SqlQuery5TestStreaming",
-        new NexmarkSqlQuery(CONFIG, new SqlQuery5(CONFIG)),
-        new Query5Model(CONFIG),
-        true);
+        "SqlQuery5TestStreaming", new SqlQuery5(CONFIG), new 
Query5Model(CONFIG), true);
   }
 
   @Ignore("https://issues.apache.org/jira/browse/BEAM-3816";)
@@ -251,21 +228,14 @@ public void query7MatchesModelStreaming() {
   @Test
   @Category(NeedsRunner.class)
   public void sqlQuery7MatchesModelBatch() {
-    queryMatchesModel(
-        "SqlQuery7TestBatch",
-        new NexmarkSqlQuery(CONFIG, new SqlQuery7(CONFIG)),
-        new Query7Model(CONFIG),
-        false);
+    queryMatchesModel("SqlQuery7TestBatch", new SqlQuery7(CONFIG), new 
Query7Model(CONFIG), false);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void sqlQuery7MatchesModelStreaming() {
     queryMatchesModel(
-        "SqlQuery7TestStreaming",
-        new NexmarkSqlQuery(CONFIG, new SqlQuery7(CONFIG)),
-        new Query7Model(CONFIG),
-        true);
+        "SqlQuery7TestStreaming", new SqlQuery7(CONFIG), new 
Query7Model(CONFIG), true);
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 163212)
    Time Spent: 5h 50m  (was: 5h 40m)

> Nexmark test of joining stream to files
> ---------------------------------------
>
>                 Key: BEAM-5817
>                 URL: https://issues.apache.org/jira/browse/BEAM-5817
>             Project: Beam
>          Issue Type: New Feature
>          Components: examples-nexmark
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Nexmark is a convenient framework for testing the use case of large scale 
> stream enrichment. One way is joining a stream to files, and it can be tested 
> via any source that Nexmark supports.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to