[ 
https://issues.apache.org/jira/browse/BEAM-5817?focusedWorklogId=162149&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162149
 ]

ASF GitHub Bot logged work on BEAM-5817:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Nov/18 23:06
            Start Date: 02/Nov/18 23:06
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #6905: [BEAM-5817] Add 
Java only BoundedSideInputJoin benchmark to Nexmark
URL: https://github.com/apache/beam/pull/6905
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
index 43b5a16c743..0fd0011f600 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
@@ -56,6 +56,21 @@
    */
   @JsonProperty public NexmarkUtils.PubSubMode pubSubMode = 
NexmarkUtils.PubSubMode.COMBINED;
 
+  /** The type of side input to use. */
+  @JsonProperty public NexmarkUtils.SideInputType sideInputType;
+
+  /** Specify the number of rows to write to the side input. */
+  @JsonProperty public int sideInputRowCount = 0;
+
+  /** Specify the number of shards to write to the side input. */
+  @JsonProperty public int sideInputNumShards = 0;
+
+  /**
+   * Specify a prefix URL for side input files, which will be created for use 
queries that join the
+   * stream to static enrichment data.
+   */
+  @JsonProperty public String sideInputUrl = null;
+
   /**
    * Number of events to generate. If zero, generate as many as possible 
without overflowing
    * internal counters etc.
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index 7d29834f806..7a60e4d235d 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -50,6 +50,8 @@
 import org.apache.beam.sdk.nexmark.model.Event;
 import org.apache.beam.sdk.nexmark.model.KnownSize;
 import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.nexmark.queries.BoundedSideInputJoin;
+import org.apache.beam.sdk.nexmark.queries.BoundedSideInputJoinModel;
 import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
 import org.apache.beam.sdk.nexmark.queries.NexmarkQueryModel;
 import org.apache.beam.sdk.nexmark.queries.Query0;
@@ -1110,6 +1112,10 @@ public NexmarkPerf run(NexmarkConfiguration 
runConfiguration) throws IOException
       // Generate events.
       PCollection<Event> source = createSource(p, now);
 
+      if (query.needsSideInput()) {
+        query.setSideInput(NexmarkUtils.prepareSideInput(p, configuration));
+      }
+
       if (options.getLogEvents()) {
         source = source.apply(queryName + ".Events.Log", 
NexmarkUtils.log(queryName + ".Events"));
       }
@@ -1199,6 +1205,7 @@ private NexmarkQuery getNexmarkQuery() {
         .put(NexmarkQueryName.HIGHEST_BID, new Query7Model(configuration))
         .put(NexmarkQueryName.MONITOR_NEW_USERS, new 
Query8Model(configuration))
         .put(NexmarkQueryName.WINNING_BIDS, new Query9Model(configuration))
+        .put(NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN, new 
BoundedSideInputJoinModel(configuration))
         .build();
   }
 
@@ -1242,6 +1249,7 @@ private NexmarkQuery getNexmarkQuery() {
         .put(NexmarkQueryName.LOG_TO_SHARDED_FILES, new Query10(configuration))
         .put(NexmarkQueryName.USER_SESSIONS, new Query11(configuration))
         .put(NexmarkQueryName.PROCESSING_TIME_WINDOWS, new 
Query12(configuration))
+        .put(NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN, new 
BoundedSideInputJoin(configuration))
         .build();
   }
 
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java
index 462497b8274..ca21439d56c 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java
@@ -39,7 +39,10 @@
   WINNING_BIDS(9), // Query "9"
   LOG_TO_SHARDED_FILES(10), // Query "10"
   USER_SESSIONS(11), // Query "11"
-  PROCESSING_TIME_WINDOWS(12); // Query "12"
+  PROCESSING_TIME_WINDOWS(12), // Query "12"
+
+  // Other non-numbered queries
+  BOUNDED_SIDE_INPUT_JOIN;
 
   private @Nullable Integer number;
 
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index cd01cca3149..72dccc4df34 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -17,14 +17,20 @@
  */
 package org.apache.beam.sdk.nexmark;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import com.google.common.hash.Hashing;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -33,7 +39,10 @@
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.nexmark.model.Auction;
@@ -58,9 +67,12 @@
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -123,6 +135,14 @@
     COMBINED
   }
 
+  /** Possible side input sources. */
+  public enum SideInputType {
+    /** Produce the side input via {@link Create}. */
+    DIRECT,
+    /** Read side input from CSV files. */
+    CSV
+  }
+
   /** Coder strategies. */
   public enum CoderStrategy {
     /** Hand-written. */
@@ -611,6 +631,107 @@ public void processElement(ProcessContext c) {
         });
   }
 
+  private static class GenerateSideInputData
+      extends PTransform<PBegin, PCollection<KV<Long, String>>> {
+
+    private final NexmarkConfiguration config;
+
+    private GenerateSideInputData(NexmarkConfiguration config) {
+      this.config = config;
+    }
+
+    @Override
+    public PCollection<KV<Long, String>> expand(PBegin input) {
+      return input
+          .apply(GenerateSequence.from(0).to(config.sideInputRowCount))
+          .apply(
+              MapElements.via(
+                  new SimpleFunction<Long, KV<Long, String>>() {
+                    @Override
+                    public KV<Long, String> apply(Long input) {
+                      return KV.of(input, String.valueOf(input));
+                    }
+                  }));
+    }
+  }
+
+  /**
+   * Write data to be read as a side input.
+   *
+   * <p>Contains pairs of a number and its string representation to model 
lookups of some enrichment
+   * data by id.
+   *
+   * <p>Generated data covers the range {@code [0, sideInputRowCount)} so 
lookup joins on any
+   * desired id field can be modeled by looking up {@code id % 
sideInputRowCount}.
+   */
+  public static PCollection<KV<Long, String>> prepareSideInput(
+      Pipeline queryPipeline, NexmarkConfiguration config) {
+
+    checkArgument(
+        config.sideInputRowCount > 0, "Side input required but 
sideInputRowCount is not >0");
+
+    PTransform<PBegin, PCollection<KV<Long, String>>> generateSideInputData =
+        new GenerateSideInputData(config);
+
+    switch (config.sideInputType) {
+      case DIRECT:
+        return queryPipeline.apply(generateSideInputData);
+      case CSV:
+        checkArgument(
+            config.sideInputUrl != null,
+            "Side input type %s requires a URL but sideInputUrl not specified",
+            SideInputType.CSV.toString());
+
+        checkArgument(
+            config.sideInputNumShards > 0,
+            "Side input type %s requires explicit numShards but 
sideInputNumShards not specified",
+            SideInputType.CSV.toString());
+
+        Pipeline tempPipeline = Pipeline.create();
+        tempPipeline
+            .apply(generateSideInputData)
+            .apply(
+                MapElements.via(
+                    new SimpleFunction<KV<Long, String>, String>(
+                        kv -> String.format("%s,%s", kv.getKey(), 
kv.getValue())) {}))
+            
.apply(TextIO.write().withNumShards(config.sideInputNumShards).to(config.sideInputUrl));
+        tempPipeline.run().waitUntilFinish();
+
+        return queryPipeline
+            .apply(TextIO.read().from(config.sideInputUrl + "*"))
+            .apply(
+                MapElements.via(
+                    new SimpleFunction<String, KV<Long, String>>(
+                        line -> {
+                          List<String> cols = 
ImmutableList.copyOf(Splitter.on(",").split(line));
+                          return KV.of(Long.valueOf(cols.get(0)), cols.get(1));
+                        }) {}));
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown type of side input requested: %s", 
config.sideInputType));
+    }
+  }
+
+  /** Frees any resources used to make the side input available. */
+  public static void cleanUpSideInput(NexmarkConfiguration config) throws 
IOException {
+    switch (config.sideInputType) {
+      case DIRECT:
+        break;
+      case CSV:
+        FileSystems.delete(
+            FileSystems.match(config.sideInputUrl + "*")
+                .metadata()
+                .stream()
+                .map(metadata -> metadata.resourceId())
+                .collect(Collectors.toList()));
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Unknown type of %s clean up requested", 
SideInputType.class.getSimpleName()));
+    }
+  }
+
   /**
    * A coder for instances of {@code T} cast up to {@link KnownSize}.
    *
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoin.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoin.java
new file mode 100644
index 00000000000..5e2025977a7
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoin.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Map;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Query that joins a stream to a bounded side input, modeling basic stream 
enrichment.
+ *
+ * <pre>
+ * SELECT bid.*, sideInput.*
+ * FROM bid, sideInput
+ * WHERE bid.id = sideInput.id
+ * </pre>
+ */
+public class BoundedSideInputJoin extends NexmarkQuery {
+  public BoundedSideInputJoin(NexmarkConfiguration configuration) {
+    super(configuration, "JoinToFiles");
+  }
+
+  private PCollection<Bid> applyTyped(PCollection<Event> events) {
+
+    checkState(getSideInput() != null, "Configuration error: side input is 
null");
+
+    final PCollectionView<Map<Long, String>> sideInputMap = 
getSideInput().apply(View.asMap());
+
+    return events
+        // Only want the bid events; easier to fake some side input data
+        .apply(JUST_BIDS)
+
+        // Map the conversion function over all bids.
+        .apply(
+            name + ".JoinToFiles",
+            ParDo.of(
+                    new DoFn<Bid, Bid>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c) {
+                        Bid bid = c.element();
+                        c.output(
+                            new Bid(
+                                bid.auction,
+                                bid.bidder,
+                                bid.price,
+                                bid.dateTime,
+                                c.sideInput(sideInputMap)
+                                    .get(bid.bidder % 
configuration.sideInputRowCount)));
+                      }
+                    })
+                .withSideInputs(sideInputMap));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinModel.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinModel.java
new file mode 100644
index 00000000000..822f343244b
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinModel.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.util.Collection;
+import java.util.Iterator;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/** A direct implementation of {@link BoundedSideInputJoin}. */
+public class BoundedSideInputJoinModel extends NexmarkQueryModel {
+
+  /** Simulator for query 0. */
+  private static class Simulator extends AbstractSimulator<Event, Bid> {
+    private final NexmarkConfiguration configuration;
+
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+      this.configuration = configuration;
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      Event event = timestampedEvent.getValue();
+      if (event.bid == null) {
+        // Ignore non-bid events.
+        return;
+      }
+
+      // Join to the side input is always a string representation of the id 
being looked up
+      Bid bid = event.bid;
+      Bid resultBid =
+          new Bid(
+              bid.auction,
+              bid.bidder,
+              bid.price,
+              bid.dateTime,
+              String.valueOf(bid.bidder % configuration.sideInputRowCount));
+      TimestampedValue<Bid> result =
+          TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
+      addResult(result);
+    }
+  }
+
+  public BoundedSideInputJoinModel(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
+    return toValueTimestamp(itr);
+  }
+}
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
index 5123342e93a..b47a0f12d35 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.nexmark.queries;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.nexmark.Monitor;
@@ -198,6 +199,7 @@ public void processElement(ProcessContext c) {
   public final Monitor<KnownSize> resultMonitor;
   private final Monitor<Event> endOfStreamMonitor;
   private final Counter fatalCounter;
+  private transient PCollection<KV<Long, String>> sideInput = null;
 
   protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
     super(name);
@@ -218,6 +220,26 @@ protected NexmarkQuery(NexmarkConfiguration configuration, 
String name) {
   /** Implement the actual query. All we know about the result is it has a 
known encoded size. */
   protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> 
events);
 
+  /** Whether this query expects a side input to be populated. Defaults to 
{@code false}. */
+  public boolean needsSideInput() {
+    return false;
+  }
+
+  /**
+   * Set the side input for the query.
+   *
+   * <p>Note that due to the nature of side inputs, this instance of the query 
is now fixed and can
+   * only be safely applied in the pipeline where the side input was created.
+   */
+  public void setSideInput(PCollection<KV<Long, String>> sideInput) {
+    this.sideInput = sideInput;
+  }
+
+  /** Get the side input, if any. */
+  public @Nullable PCollection<KV<Long, String>> getSideInput() {
+    return sideInput;
+  }
+
   @Override
   public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> 
events) {
 
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
index acb95115a72..581f760d582 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
@@ -18,11 +18,9 @@
 package org.apache.beam.sdk.nexmark.queries;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
@@ -57,24 +55,15 @@ static Instant windowStart(Duration size, Duration period, 
Instant timestamp) {
     return new Instant(lim - s);
   }
 
-  /** Convert {@code itr} to strings capturing values, timestamps and order. */
-  static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> 
itr) {
-    List<String> strings = new ArrayList<>();
+  /** Convert {@code itr} to strings capturing values and timestamps. */
+  static <T> Set<String> toValueTimestamp(Iterator<TimestampedValue<T>> itr) {
+    Set<String> strings = new HashSet<>();
     while (itr.hasNext()) {
       strings.add(itr.next().toString());
     }
     return strings;
   }
 
-  /** Convert {@code itr} to strings capturing values and order. */
-  static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
-    List<String> strings = new ArrayList<>();
-    while (itr.hasNext()) {
-      strings.add(itr.next().getValue().toString());
-    }
-    return strings;
-  }
-
   /** Convert {@code itr} to strings capturing values only. */
   static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
     Set<String> strings = new HashSet<>();
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
index 103a1733ff9..1f4251b1b74 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
@@ -54,6 +54,6 @@ public Query0Model(NexmarkConfiguration configuration) {
 
   @Override
   protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
-    return toValueTimestampOrder(itr);
+    return toValueTimestamp(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
index c1c3d012816..9097fa289cc 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
@@ -66,6 +66,6 @@ public Query1Model(NexmarkConfiguration configuration) {
 
   @Override
   protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
-    return toValueTimestampOrder(itr);
+    return toValueTimestamp(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
index 6cf13784ef5..0a328b92381 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
@@ -70,6 +70,6 @@ public Query2Model(NexmarkConfiguration configuration) {
 
   @Override
   protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
-    return toValueTimestampOrder(itr);
+    return toValueTimestamp(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
index bc68fbd1c49..9ff026b08de 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
@@ -119,6 +119,6 @@ public Query7Model(NexmarkConfiguration configuration) {
 
   @Override
   protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> 
itr) {
-    return toValueOrder(itr);
+    return toValue(itr);
   }
 }
diff --git 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkUtilsTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkUtilsTest.java
new file mode 100644
index 00000000000..975a595d33c
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkUtilsTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test the various NEXMark queries yield results coherent with their models. 
*/
+@RunWith(JUnit4.class)
+public class NexmarkUtilsTest {
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void testPrepareCsvSideInput() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.CSV;
+    ResourceId sideInputResourceId =
+        FileSystems.matchNewResource(
+            String.format(
+                "%s/JoinToFiles-%s",
+                pipeline.getOptions().getTempLocation(), new 
Random().nextInt()),
+            false);
+    config.sideInputUrl = sideInputResourceId.toString();
+    config.sideInputRowCount = 10000;
+    config.sideInputNumShards = 15;
+
+    PCollection<KV<Long, String>> sideInput = 
NexmarkUtils.prepareSideInput(pipeline, config);
+    try {
+      PAssert.that(sideInput)
+          .containsInAnyOrder(
+              LongStream.range(0, config.sideInputRowCount)
+                  .boxed()
+                  .map(l -> KV.of(l, l.toString()))
+                  .collect(Collectors.toList()));
+      pipeline.run();
+    } finally {
+      NexmarkUtils.cleanUpSideInput(config);
+    }
+  }
+}
diff --git 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest.java
new file mode 100644
index 00000000000..3a1f4195730
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/BoundedSideInputJoinTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.util.Random;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test the various NEXMark queries yield results coherent with their models. 
*/
+@RunWith(JUnit4.class)
+public class BoundedSideInputJoinTest {
+
+  @Rule public TestPipeline p = TestPipeline.create();
+
+  @Before
+  public void setupPipeline() {
+    NexmarkUtils.setupPipeline(NexmarkUtils.CoderStrategy.HAND, p);
+  }
+
+  /** Test {@code query} matches {@code model}. */
+  private void queryMatchesModel(
+      String name,
+      NexmarkConfiguration config,
+      NexmarkQuery query,
+      NexmarkQueryModel model,
+      boolean streamingMode)
+      throws Exception {
+
+    ResourceId sideInputResourceId =
+        FileSystems.matchNewResource(
+            String.format(
+                "%s/JoinToFiles-%s", p.getOptions().getTempLocation(), new 
Random().nextInt()),
+            false);
+    config.sideInputUrl = sideInputResourceId.toString();
+
+    try {
+      PCollection<KV<Long, String>> sideInput = 
NexmarkUtils.prepareSideInput(p, config);
+      query.setSideInput(sideInput);
+
+      PCollection<TimestampedValue<KnownSize>> results;
+      if (streamingMode) {
+        results =
+            p.apply(name + ".ReadUnBounded", 
NexmarkUtils.streamEventsSource(config)).apply(query);
+      } else {
+        results =
+            p.apply(name + ".ReadBounded", 
NexmarkUtils.batchEventsSource(config)).apply(query);
+      }
+      PAssert.that(results).satisfies(model.assertionFor());
+      PipelineResult result = p.run();
+      result.waitUntilFinish();
+    } finally {
+      NexmarkUtils.cleanUpSideInput(config);
+    }
+  }
+
+  /**
+   * A smoke test that the count of input bids and outputs are the same, to 
help diagnose flakiness
+   * in more complex tests.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void inputOutputSameEvents() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.DIRECT;
+    config.numEventGenerators = 1;
+    config.numEvents = 5000;
+    config.sideInputRowCount = 10;
+    config.sideInputNumShards = 3;
+    PCollection<KV<Long, String>> sideInput = NexmarkUtils.prepareSideInput(p, 
config);
+
+    try {
+      PCollection<Event> input = 
p.apply(NexmarkUtils.batchEventsSource(config));
+      PCollection<Bid> justBids = input.apply(NexmarkQuery.JUST_BIDS);
+      PCollection<Long> bidCount = justBids.apply("Count Bids", 
Count.globally());
+
+      NexmarkQuery query = new BoundedSideInputJoin(config);
+      query.setSideInput(sideInput);
+
+      PCollection<TimestampedValue<KnownSize>> output = input.apply(query);
+      PCollection<Long> outputCount = output.apply("Count outputs", 
Count.globally());
+
+      
PAssert.that(PCollectionList.of(bidCount).and(outputCount).apply(Flatten.pCollections()))
+          .satisfies(
+              counts -> {
+                assertThat(Iterables.size(counts), equalTo(2));
+                assertThat(Iterables.get(counts, 0), greaterThan(0L));
+                assertThat(Iterables.get(counts, 0), 
equalTo(Iterables.get(counts, 1)));
+                return null;
+              });
+      p.run();
+    } finally {
+      NexmarkUtils.cleanUpSideInput(config);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void queryMatchesModelBatchDirect() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.DIRECT;
+    config.numEventGenerators = 1;
+    config.numEvents = 5000;
+    config.sideInputRowCount = 10;
+    config.sideInputNumShards = 3;
+
+    queryMatchesModel(
+        "BoundedSideInputJoinTestBatch",
+        config,
+        new BoundedSideInputJoin(config),
+        new BoundedSideInputJoinModel(config),
+        false);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void queryMatchesModelStreamingDirect() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.DIRECT;
+    config.numEventGenerators = 1;
+    config.numEvents = 5000;
+    config.sideInputRowCount = 10;
+    config.sideInputNumShards = 3;
+    queryMatchesModel(
+        "BoundedSideInputJoinTestStreaming",
+        config,
+        new BoundedSideInputJoin(config),
+        new BoundedSideInputJoinModel(config),
+        true);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void queryMatchesModelBatchCsv() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.CSV;
+    config.numEventGenerators = 1;
+    config.numEvents = 5000;
+    config.sideInputRowCount = 10;
+    config.sideInputNumShards = 3;
+
+    queryMatchesModel(
+        "BoundedSideInputJoinTestBatch",
+        config,
+        new BoundedSideInputJoin(config),
+        new BoundedSideInputJoinModel(config),
+        false);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void queryMatchesModelStreamingCsv() throws Exception {
+    NexmarkConfiguration config = NexmarkConfiguration.DEFAULT.copy();
+    config.sideInputType = NexmarkUtils.SideInputType.CSV;
+    config.numEventGenerators = 1;
+    config.numEvents = 5000;
+    config.sideInputRowCount = 10;
+    config.sideInputNumShards = 3;
+    queryMatchesModel(
+        "BoundedSideInputJoinTestStreaming",
+        config,
+        new BoundedSideInputJoin(config),
+        new BoundedSideInputJoinModel(config),
+        true);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 162149)
    Time Spent: 4h 20m  (was: 4h 10m)

> Nexmark test of joining stream to files
> ---------------------------------------
>
>                 Key: BEAM-5817
>                 URL: https://issues.apache.org/jira/browse/BEAM-5817
>             Project: Beam
>          Issue Type: New Feature
>          Components: examples-nexmark
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Nexmark is a convenient framework for testing the use case of large scale 
> stream enrichment. One way is joining a stream to files, and it can be tested 
> via any source that Nexmark supports.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to