[ 
https://issues.apache.org/jira/browse/BEAM-5817?focusedWorklogId=159892&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-159892
 ]

ASF GitHub Bot logged work on BEAM-5817:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Oct/18 08:45
            Start Date: 29/Oct/18 08:45
    Worklog Time Spent: 10m 
      Work Description: echauchot closed pull request #6780: [BEAM-5817] Use an 
explicit enum for naming Nexmark benchmarks
URL: https://github.com/apache/beam/pull/6780
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index 5aa68be7ea3..3ca6a3562ac 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -223,7 +223,7 @@ public NexmarkPerf decode(InputStream inStream)
         tableFunction =
             input ->
                 new TableDestination(
-                    tableSpec.replace("{query}", 
String.valueOf(input.getValue().getKey().query)),
+                    tableSpec.replace("{query}", 
input.getValue().getKey().query.getNumberOrName()),
                     "perfkit queries");
     SerializableFunction<KV<NexmarkConfiguration, NexmarkPerf>, TableRow> 
rowFunction =
         input -> {
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
index 0d282503160..43b5a16c743 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
@@ -35,7 +35,7 @@
   @JsonProperty public boolean debug = true;
 
   /** Which query to run, in [0,9]. */
-  @JsonProperty public int query = 0;
+  @JsonProperty public NexmarkQueryName query = null;
 
   /** Where events come from. */
   @JsonProperty public NexmarkUtils.SourceType sourceType = 
NexmarkUtils.SourceType.DIRECT;
@@ -186,7 +186,14 @@ public void overrideFromOptions(NexmarkOptions options) {
       debug = options.getDebug();
     }
     if (options.getQuery() != null) {
-      query = options.getQuery();
+      try {
+        query = NexmarkQueryName.valueOf(options.getQuery());
+      } catch (IllegalArgumentException exc) {
+        query = 
NexmarkQueryName.fromNumber(Integer.parseInt(options.getQuery()));
+      }
+      if (query == null) {
+        throw new IllegalArgumentException("Unknown query: " + query);
+      }
     }
     if (options.getSourceType() != null) {
       sourceType = options.getSourceType();
@@ -353,7 +360,7 @@ public NexmarkConfiguration copy() {
    */
   public String toShortString() {
     StringBuilder sb = new StringBuilder();
-    sb.append(String.format("query:%d", query));
+    sb.append(String.format("query:%s", query));
     if (debug != DEFAULT.debug) {
       sb.append(String.format("; debug:%s", debug));
     }
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index ed1953f819e..7d29834f806 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -26,12 +26,13 @@
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
@@ -1045,7 +1046,7 @@ private void modelResultRates(NexmarkQueryModel model) {
     Collections.sort(counts);
     int n = counts.size();
     if (n < 5) {
-      NexmarkUtils.console("Query%d: only %d samples", 
model.configuration.query, n);
+      NexmarkUtils.console("Query%s: only %d samples", 
model.configuration.query, n);
     } else {
       NexmarkUtils.console(
           "Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d",
@@ -1124,7 +1125,7 @@ public NexmarkPerf run(NexmarkConfiguration 
runConfiguration) throws IOException
 
         // Query 10 logs all events to Google Cloud storage files. It could 
generate a lot of logs,
         // so, set parallelism. Also set the output path where to write log 
files.
-        if (configuration.query == 10) {
+        if (configuration.query == NexmarkQueryName.LOG_TO_SHARDED_FILES) {
           String path = null;
           if (options.getOutputPath() != null && 
!options.getOutputPath().isEmpty()) {
             path = logsDir(now.getMillis());
@@ -1169,76 +1170,79 @@ private boolean isSql() {
   }
 
   private NexmarkQueryModel getNexmarkQueryModel() {
-    List<NexmarkQueryModel> models = createQueryModels();
+    Map<NexmarkQueryName, NexmarkQueryModel> models = createQueryModels();
     return models.get(configuration.query);
   }
 
   private NexmarkQuery getNexmarkQuery() {
-    List<NexmarkQuery> queries = createQueries();
-
-    if (configuration.query >= queries.size()) {
-      return null;
-    }
-
+    Map<NexmarkQueryName, NexmarkQuery> queries = createQueries();
     return queries.get(configuration.query);
   }
 
-  private List<NexmarkQueryModel> createQueryModels() {
+  private Map<NexmarkQueryName, NexmarkQueryModel> createQueryModels() {
     return isSql() ? createSqlQueryModels() : createJavaQueryModels();
   }
 
-  private List<NexmarkQueryModel> createSqlQueryModels() {
-    return Arrays.asList(null, null, null, null, null, null, null, null, null, 
null, null, null);
+  private Map<NexmarkQueryName, NexmarkQueryModel> createSqlQueryModels() {
+    return ImmutableMap.of();
   }
 
-  private List<NexmarkQueryModel> createJavaQueryModels() {
-    return Arrays.asList(
-        new Query0Model(configuration),
-        new Query1Model(configuration),
-        new Query2Model(configuration),
-        new Query3Model(configuration),
-        new Query4Model(configuration),
-        new Query5Model(configuration),
-        new Query6Model(configuration),
-        new Query7Model(configuration),
-        new Query8Model(configuration),
-        new Query9Model(configuration),
-        null,
-        null,
-        null);
+  private Map<NexmarkQueryName, NexmarkQueryModel> createJavaQueryModels() {
+    return ImmutableMap.<NexmarkQueryName, NexmarkQueryModel>builder()
+        .put(NexmarkQueryName.PASSTHROUGH, new Query0Model(configuration))
+        .put(NexmarkQueryName.CURRENCY_CONVERSION, new 
Query1Model(configuration))
+        .put(NexmarkQueryName.SELECTION, new Query2Model(configuration))
+        .put(NexmarkQueryName.LOCAL_ITEM_SUGGESTION, new 
Query3Model(configuration))
+        .put(NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY, new 
Query4Model(configuration))
+        .put(NexmarkQueryName.HOT_ITEMS, new Query5Model(configuration))
+        .put(NexmarkQueryName.AVERAGE_SELLING_PRICE_BY_SELLER, new 
Query6Model(configuration))
+        .put(NexmarkQueryName.HIGHEST_BID, new Query7Model(configuration))
+        .put(NexmarkQueryName.MONITOR_NEW_USERS, new 
Query8Model(configuration))
+        .put(NexmarkQueryName.WINNING_BIDS, new Query9Model(configuration))
+        .build();
   }
 
-  private List<NexmarkQuery> createQueries() {
+  private Map<NexmarkQueryName, NexmarkQuery> createQueries() {
     return isSql() ? createSqlQueries() : createJavaQueries();
   }
 
-  private List<NexmarkQuery> createSqlQueries() {
-    return Arrays.asList(
-        new NexmarkSqlQuery(configuration, new SqlQuery0()),
-        new NexmarkSqlQuery(configuration, new SqlQuery1()),
-        new NexmarkSqlQuery(configuration, new 
SqlQuery2(configuration.auctionSkip)),
-        new NexmarkSqlQuery(configuration, new SqlQuery3(configuration)),
-        null,
-        new NexmarkSqlQuery(configuration, new SqlQuery5(configuration)),
-        null,
-        new NexmarkSqlQuery(configuration, new SqlQuery7(configuration)));
+  private Map<NexmarkQueryName, NexmarkQuery> createSqlQueries() {
+    return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder()
+        .put(NexmarkQueryName.PASSTHROUGH, new NexmarkSqlQuery(configuration, 
new SqlQuery0()))
+        .put(
+            NexmarkQueryName.CURRENCY_CONVERSION,
+            new NexmarkSqlQuery(configuration, new SqlQuery1()))
+        .put(
+            NexmarkQueryName.SELECTION,
+            new NexmarkSqlQuery(configuration, new 
SqlQuery2(configuration.auctionSkip)))
+        .put(
+            NexmarkQueryName.LOCAL_ITEM_SUGGESTION,
+            new NexmarkSqlQuery(configuration, new SqlQuery3(configuration)))
+        .put(
+            NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY,
+            new NexmarkSqlQuery(configuration, new SqlQuery5(configuration)))
+        .put(
+            NexmarkQueryName.HOT_ITEMS,
+            new NexmarkSqlQuery(configuration, new SqlQuery7(configuration)))
+        .build();
   }
 
-  private List<NexmarkQuery> createJavaQueries() {
-    return Arrays.asList(
-        new Query0(configuration),
-        new Query1(configuration),
-        new Query2(configuration),
-        new Query3(configuration),
-        new Query4(configuration),
-        new Query5(configuration),
-        new Query6(configuration),
-        new Query7(configuration),
-        new Query8(configuration),
-        new Query9(configuration),
-        new Query10(configuration),
-        new Query11(configuration),
-        new Query12(configuration));
+  private Map<NexmarkQueryName, NexmarkQuery> createJavaQueries() {
+    return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder()
+        .put(NexmarkQueryName.PASSTHROUGH, new Query0(configuration))
+        .put(NexmarkQueryName.CURRENCY_CONVERSION, new Query1(configuration))
+        .put(NexmarkQueryName.SELECTION, new Query2(configuration))
+        .put(NexmarkQueryName.LOCAL_ITEM_SUGGESTION, new Query3(configuration))
+        .put(NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY, new 
Query4(configuration))
+        .put(NexmarkQueryName.HOT_ITEMS, new Query5(configuration))
+        .put(NexmarkQueryName.AVERAGE_SELLING_PRICE_BY_SELLER, new 
Query6(configuration))
+        .put(NexmarkQueryName.HIGHEST_BID, new Query7(configuration))
+        .put(NexmarkQueryName.MONITOR_NEW_USERS, new Query8(configuration))
+        .put(NexmarkQueryName.WINNING_BIDS, new Query9(configuration))
+        .put(NexmarkQueryName.LOG_TO_SHARDED_FILES, new Query10(configuration))
+        .put(NexmarkQueryName.USER_SESSIONS, new Query11(configuration))
+        .put(NexmarkQueryName.PROCESSING_TIME_WINDOWS, new 
Query12(configuration))
+        .build();
   }
 
   private static class PubsubMessageEventDoFn extends DoFn<PubsubMessage, 
Event> {
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
index 94643655769..681316aa08d 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java
@@ -73,9 +73,9 @@
 
   @Description("Which query to run.")
   @Nullable
-  Integer getQuery();
+  String getQuery();
 
-  void setQuery(Integer query);
+  void setQuery(String query);
 
   @Description("Prefix for output files if using text output for results or 
running Query 10.")
   @Nullable
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java
new file mode 100644
index 00000000000..462497b8274
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import javax.annotation.Nullable;
+
+/** Known "Nexmark" queries, some of which are of our own devising but use the 
same data set. */
+@SuppressWarnings("ImmutableEnumChecker")
+public enum NexmarkQueryName {
+  // A baseline
+  PASSTHROUGH(0),
+
+  // The actual Nexmark queries
+  CURRENCY_CONVERSION(1),
+  SELECTION(2),
+  LOCAL_ITEM_SUGGESTION(3),
+  AVERAGE_PRICE_FOR_CATEGORY(4),
+  HOT_ITEMS(5),
+  AVERAGE_SELLING_PRICE_BY_SELLER(6),
+  HIGHEST_BID(7),
+  MONITOR_NEW_USERS(8), // Query 8
+
+  // Misc other queries against the data set
+  WINNING_BIDS(9), // Query "9"
+  LOG_TO_SHARDED_FILES(10), // Query "10"
+  USER_SESSIONS(11), // Query "11"
+  PROCESSING_TIME_WINDOWS(12); // Query "12"
+
+  private @Nullable Integer number;
+
+  NexmarkQueryName() {
+    this.number = null;
+  }
+
+  NexmarkQueryName(int number) {
+    this.number = number;
+  }
+
+  /**
+   * Outputs the number of a query if the query is one that we have given a 
number, for
+   * compatibility. If the query does not have a number, returns its name.
+   */
+  public String getNumberOrName() {
+    if (number != null) {
+      return number.toString();
+    } else {
+      return this.toString();
+    }
+  }
+
+  /**
+   * Parses a number into the corresponding query, using the numbers from the 
original Nexmark suite
+   * and the other numbers that we have added. If the number is not a known 
query, returns {@code
+   * null}.
+   */
+  public static @Nullable NexmarkQueryName fromNumber(int nexmarkQueryNumber) {
+    // Noting that the number of names is O(1) this is still O(1) :-)
+    for (NexmarkQueryName queryName : NexmarkQueryName.values()) {
+      if (queryName.number == nexmarkQueryNumber) {
+        return queryName;
+      }
+    }
+    return null;
+  }
+}
diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
index 48423dc3f35..1d82c65d121 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java
@@ -52,11 +52,13 @@
 
   private static List<NexmarkConfiguration> smoke() {
     List<NexmarkConfiguration> configurations = new ArrayList<>();
-    for (int query = 0; query <= 12; query++) {
+    for (NexmarkQueryName query : NexmarkQueryName.values()) {
       NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy();
       configuration.query = query;
       configuration.numEvents = 100_000;
-      if (query == 4 || query == 6 || query == 9) {
+      if (query == NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY
+          || query == NexmarkQueryName.AVERAGE_SELLING_PRICE_BY_SELLER
+          || query == NexmarkQueryName.WINNING_BIDS) {
         // Scale back so overall runtimes are reasonably close across all 
queries.
         configuration.numEvents /= 10;
       }
@@ -89,7 +91,7 @@
     NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy();
     configuration.numEventGenerators = 10;
 
-    configuration.query = 10;
+    configuration.query = NexmarkQueryName.LOG_TO_SHARDED_FILES;
     configuration.isRateLimited = true;
     configuration.sourceType = NexmarkUtils.SourceType.PUBSUB;
     configuration.numEvents = 0; // as many as possible without overflow.
diff --git 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkConfigurationTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkConfigurationTest.java
new file mode 100644
index 00000000000..de447b03c47
--- /dev/null
+++ 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkConfigurationTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+/** Test of {@link NexmarkConfiguration} conversion from {@link 
NexmarkOptions}. */
+public class NexmarkConfigurationTest {
+
+  @Test
+  public void testNumericQueryParsing() {
+    NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT;
+    configuration.overrideFromOptions(
+        PipelineOptionsFactory.fromArgs("--query=3").as(NexmarkOptions.class));
+    assertThat(configuration.query, 
equalTo(NexmarkQueryName.LOCAL_ITEM_SUGGESTION));
+  }
+
+  @Test
+  public void testNamedQueryParsing() {
+    NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT;
+    configuration.overrideFromOptions(
+        
PipelineOptionsFactory.fromArgs("--query=HIGHEST_BID").as(NexmarkOptions.class));
+    assertThat(configuration.query, equalTo(NexmarkQueryName.HIGHEST_BID));
+  }
+}
diff --git 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
index 620f1b3a6db..363c3176ea8 100644
--- 
a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
+++ 
b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
@@ -42,7 +42,7 @@
 /** Test class for BigQuery sinks. */
 public class PerfsToBigQueryTest {
 
-  private static final int QUERY = 1;
+  private static final NexmarkQueryName QUERY = 
NexmarkQueryName.CURRENCY_CONVERSION;
   private NexmarkOptions options;
   private FakeDatasetService fakeDatasetService = new FakeDatasetService();
   private FakeJobService fakeJobService = new FakeJobService();
@@ -98,7 +98,7 @@ public void testSavePerfsToBigQuery() throws IOException, 
InterruptedException {
     Main.savePerfsToBigQuery(
         options, perfs, fakeBqServices, new Instant(startTimestampSeconds * 
1000L));
 
-    String tableSpec = NexmarkUtils.tableSpec(options, String.valueOf(QUERY), 
0L, null);
+    String tableSpec = NexmarkUtils.tableSpec(options, 
QUERY.getNumberOrName(), 0L, null);
     List<TableRow> actualRows =
         fakeDatasetService.getAllRows(
             options.getProject(),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 159892)
    Time Spent: 2h 50m  (was: 2h 40m)

> Nexmark test of joining stream to files
> ---------------------------------------
>
>                 Key: BEAM-5817
>                 URL: https://issues.apache.org/jira/browse/BEAM-5817
>             Project: Beam
>          Issue Type: New Feature
>          Components: examples-nexmark
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Nexmark is a convenient framework for testing the use case of large scale 
> stream enrichment. One way is joining a stream to files, and it can be tested 
> via any source that Nexmark supports.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to