[ https://issues.apache.org/jira/browse/BEAM-5817?focusedWorklogId=159892&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-159892 ]
ASF GitHub Bot logged work on BEAM-5817: ---------------------------------------- Author: ASF GitHub Bot Created on: 29/Oct/18 08:45 Start Date: 29/Oct/18 08:45 Worklog Time Spent: 10m Work Description: echauchot closed pull request #6780: [BEAM-5817] Use an explicit enum for naming Nexmark benchmarks URL: https://github.com/apache/beam/pull/6780 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java index 5aa68be7ea3..3ca6a3562ac 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java @@ -223,7 +223,7 @@ public NexmarkPerf decode(InputStream inStream) tableFunction = input -> new TableDestination( - tableSpec.replace("{query}", String.valueOf(input.getValue().getKey().query)), + tableSpec.replace("{query}", input.getValue().getKey().query.getNumberOrName()), "perfkit queries"); SerializableFunction<KV<NexmarkConfiguration, NexmarkPerf>, TableRow> rowFunction = input -> { diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java index 0d282503160..43b5a16c743 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java @@ -35,7 +35,7 @@ @JsonProperty public boolean debug = true; /** Which query to run, in [0,9]. */ - @JsonProperty public int query = 0; + @JsonProperty public NexmarkQueryName query = null; /** Where events come from. */ @JsonProperty public NexmarkUtils.SourceType sourceType = NexmarkUtils.SourceType.DIRECT; @@ -186,7 +186,14 @@ public void overrideFromOptions(NexmarkOptions options) { debug = options.getDebug(); } if (options.getQuery() != null) { - query = options.getQuery(); + try { + query = NexmarkQueryName.valueOf(options.getQuery()); + } catch (IllegalArgumentException exc) { + query = NexmarkQueryName.fromNumber(Integer.parseInt(options.getQuery())); + } + if (query == null) { + throw new IllegalArgumentException("Unknown query: " + query); + } } if (options.getSourceType() != null) { sourceType = options.getSourceType(); @@ -353,7 +360,7 @@ public NexmarkConfiguration copy() { */ public String toShortString() { StringBuilder sb = new StringBuilder(); - sb.append(String.format("query:%d", query)); + sb.append(String.format("query:%s", query)); if (debug != DEFAULT.debug) { sb.append(String.format("; debug:%s", debug)); } diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index ed1953f819e..7d29834f806 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -26,12 +26,13 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; @@ -1045,7 +1046,7 @@ private void modelResultRates(NexmarkQueryModel model) { Collections.sort(counts); int n = counts.size(); if (n < 5) { - NexmarkUtils.console("Query%d: only %d samples", model.configuration.query, n); + NexmarkUtils.console("Query%s: only %d samples", model.configuration.query, n); } else { NexmarkUtils.console( "Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d", @@ -1124,7 +1125,7 @@ public NexmarkPerf run(NexmarkConfiguration runConfiguration) throws IOException // Query 10 logs all events to Google Cloud storage files. It could generate a lot of logs, // so, set parallelism. Also set the output path where to write log files. - if (configuration.query == 10) { + if (configuration.query == NexmarkQueryName.LOG_TO_SHARDED_FILES) { String path = null; if (options.getOutputPath() != null && !options.getOutputPath().isEmpty()) { path = logsDir(now.getMillis()); @@ -1169,76 +1170,79 @@ private boolean isSql() { } private NexmarkQueryModel getNexmarkQueryModel() { - List<NexmarkQueryModel> models = createQueryModels(); + Map<NexmarkQueryName, NexmarkQueryModel> models = createQueryModels(); return models.get(configuration.query); } private NexmarkQuery getNexmarkQuery() { - List<NexmarkQuery> queries = createQueries(); - - if (configuration.query >= queries.size()) { - return null; - } - + Map<NexmarkQueryName, NexmarkQuery> queries = createQueries(); return queries.get(configuration.query); } - private List<NexmarkQueryModel> createQueryModels() { + private Map<NexmarkQueryName, NexmarkQueryModel> createQueryModels() { return isSql() ? createSqlQueryModels() : createJavaQueryModels(); } - private List<NexmarkQueryModel> createSqlQueryModels() { - return Arrays.asList(null, null, null, null, null, null, null, null, null, null, null, null); + private Map<NexmarkQueryName, NexmarkQueryModel> createSqlQueryModels() { + return ImmutableMap.of(); } - private List<NexmarkQueryModel> createJavaQueryModels() { - return Arrays.asList( - new Query0Model(configuration), - new Query1Model(configuration), - new Query2Model(configuration), - new Query3Model(configuration), - new Query4Model(configuration), - new Query5Model(configuration), - new Query6Model(configuration), - new Query7Model(configuration), - new Query8Model(configuration), - new Query9Model(configuration), - null, - null, - null); + private Map<NexmarkQueryName, NexmarkQueryModel> createJavaQueryModels() { + return ImmutableMap.<NexmarkQueryName, NexmarkQueryModel>builder() + .put(NexmarkQueryName.PASSTHROUGH, new Query0Model(configuration)) + .put(NexmarkQueryName.CURRENCY_CONVERSION, new Query1Model(configuration)) + .put(NexmarkQueryName.SELECTION, new Query2Model(configuration)) + .put(NexmarkQueryName.LOCAL_ITEM_SUGGESTION, new Query3Model(configuration)) + .put(NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY, new Query4Model(configuration)) + .put(NexmarkQueryName.HOT_ITEMS, new Query5Model(configuration)) + .put(NexmarkQueryName.AVERAGE_SELLING_PRICE_BY_SELLER, new Query6Model(configuration)) + .put(NexmarkQueryName.HIGHEST_BID, new Query7Model(configuration)) + .put(NexmarkQueryName.MONITOR_NEW_USERS, new Query8Model(configuration)) + .put(NexmarkQueryName.WINNING_BIDS, new Query9Model(configuration)) + .build(); } - private List<NexmarkQuery> createQueries() { + private Map<NexmarkQueryName, NexmarkQuery> createQueries() { return isSql() ? createSqlQueries() : createJavaQueries(); } - private List<NexmarkQuery> createSqlQueries() { - return Arrays.asList( - new NexmarkSqlQuery(configuration, new SqlQuery0()), - new NexmarkSqlQuery(configuration, new SqlQuery1()), - new NexmarkSqlQuery(configuration, new SqlQuery2(configuration.auctionSkip)), - new NexmarkSqlQuery(configuration, new SqlQuery3(configuration)), - null, - new NexmarkSqlQuery(configuration, new SqlQuery5(configuration)), - null, - new NexmarkSqlQuery(configuration, new SqlQuery7(configuration))); + private Map<NexmarkQueryName, NexmarkQuery> createSqlQueries() { + return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder() + .put(NexmarkQueryName.PASSTHROUGH, new NexmarkSqlQuery(configuration, new SqlQuery0())) + .put( + NexmarkQueryName.CURRENCY_CONVERSION, + new NexmarkSqlQuery(configuration, new SqlQuery1())) + .put( + NexmarkQueryName.SELECTION, + new NexmarkSqlQuery(configuration, new SqlQuery2(configuration.auctionSkip))) + .put( + NexmarkQueryName.LOCAL_ITEM_SUGGESTION, + new NexmarkSqlQuery(configuration, new SqlQuery3(configuration))) + .put( + NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY, + new NexmarkSqlQuery(configuration, new SqlQuery5(configuration))) + .put( + NexmarkQueryName.HOT_ITEMS, + new NexmarkSqlQuery(configuration, new SqlQuery7(configuration))) + .build(); } - private List<NexmarkQuery> createJavaQueries() { - return Arrays.asList( - new Query0(configuration), - new Query1(configuration), - new Query2(configuration), - new Query3(configuration), - new Query4(configuration), - new Query5(configuration), - new Query6(configuration), - new Query7(configuration), - new Query8(configuration), - new Query9(configuration), - new Query10(configuration), - new Query11(configuration), - new Query12(configuration)); + private Map<NexmarkQueryName, NexmarkQuery> createJavaQueries() { + return ImmutableMap.<NexmarkQueryName, NexmarkQuery>builder() + .put(NexmarkQueryName.PASSTHROUGH, new Query0(configuration)) + .put(NexmarkQueryName.CURRENCY_CONVERSION, new Query1(configuration)) + .put(NexmarkQueryName.SELECTION, new Query2(configuration)) + .put(NexmarkQueryName.LOCAL_ITEM_SUGGESTION, new Query3(configuration)) + .put(NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY, new Query4(configuration)) + .put(NexmarkQueryName.HOT_ITEMS, new Query5(configuration)) + .put(NexmarkQueryName.AVERAGE_SELLING_PRICE_BY_SELLER, new Query6(configuration)) + .put(NexmarkQueryName.HIGHEST_BID, new Query7(configuration)) + .put(NexmarkQueryName.MONITOR_NEW_USERS, new Query8(configuration)) + .put(NexmarkQueryName.WINNING_BIDS, new Query9(configuration)) + .put(NexmarkQueryName.LOG_TO_SHARDED_FILES, new Query10(configuration)) + .put(NexmarkQueryName.USER_SESSIONS, new Query11(configuration)) + .put(NexmarkQueryName.PROCESSING_TIME_WINDOWS, new Query12(configuration)) + .build(); } private static class PubsubMessageEventDoFn extends DoFn<PubsubMessage, Event> { diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java index 94643655769..681316aa08d 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -73,9 +73,9 @@ @Description("Which query to run.") @Nullable - Integer getQuery(); + String getQuery(); - void setQuery(Integer query); + void setQuery(String query); @Description("Prefix for output files if using text output for results or running Query 10.") @Nullable diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java new file mode 100644 index 00000000000..462497b8274 --- /dev/null +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkQueryName.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark; + +import javax.annotation.Nullable; + +/** Known "Nexmark" queries, some of which are of our own devising but use the same data set. */ +@SuppressWarnings("ImmutableEnumChecker") +public enum NexmarkQueryName { + // A baseline + PASSTHROUGH(0), + + // The actual Nexmark queries + CURRENCY_CONVERSION(1), + SELECTION(2), + LOCAL_ITEM_SUGGESTION(3), + AVERAGE_PRICE_FOR_CATEGORY(4), + HOT_ITEMS(5), + AVERAGE_SELLING_PRICE_BY_SELLER(6), + HIGHEST_BID(7), + MONITOR_NEW_USERS(8), // Query 8 + + // Misc other queries against the data set + WINNING_BIDS(9), // Query "9" + LOG_TO_SHARDED_FILES(10), // Query "10" + USER_SESSIONS(11), // Query "11" + PROCESSING_TIME_WINDOWS(12); // Query "12" + + private @Nullable Integer number; + + NexmarkQueryName() { + this.number = null; + } + + NexmarkQueryName(int number) { + this.number = number; + } + + /** + * Outputs the number of a query if the query is one that we have given a number, for + * compatibility. If the query does not have a number, returns its name. + */ + public String getNumberOrName() { + if (number != null) { + return number.toString(); + } else { + return this.toString(); + } + } + + /** + * Parses a number into the corresponding query, using the numbers from the original Nexmark suite + * and the other numbers that we have added. If the number is not a known query, returns {@code + * null}. + */ + public static @Nullable NexmarkQueryName fromNumber(int nexmarkQueryNumber) { + // Noting that the number of names is O(1) this is still O(1) :-) + for (NexmarkQueryName queryName : NexmarkQueryName.values()) { + if (queryName.number == nexmarkQueryNumber) { + return queryName; + } + } + return null; + } +} diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java index 48423dc3f35..1d82c65d121 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkSuite.java @@ -52,11 +52,13 @@ private static List<NexmarkConfiguration> smoke() { List<NexmarkConfiguration> configurations = new ArrayList<>(); - for (int query = 0; query <= 12; query++) { + for (NexmarkQueryName query : NexmarkQueryName.values()) { NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy(); configuration.query = query; configuration.numEvents = 100_000; - if (query == 4 || query == 6 || query == 9) { + if (query == NexmarkQueryName.AVERAGE_PRICE_FOR_CATEGORY + || query == NexmarkQueryName.AVERAGE_SELLING_PRICE_BY_SELLER + || query == NexmarkQueryName.WINNING_BIDS) { // Scale back so overall runtimes are reasonably close across all queries. configuration.numEvents /= 10; } @@ -89,7 +91,7 @@ NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT.copy(); configuration.numEventGenerators = 10; - configuration.query = 10; + configuration.query = NexmarkQueryName.LOG_TO_SHARDED_FILES; configuration.isRateLimited = true; configuration.sourceType = NexmarkUtils.SourceType.PUBSUB; configuration.numEvents = 0; // as many as possible without overflow. diff --git a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkConfigurationTest.java b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkConfigurationTest.java new file mode 100644 index 00000000000..de447b03c47 --- /dev/null +++ b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/NexmarkConfigurationTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.nexmark; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; + +/** Test of {@link NexmarkConfiguration} conversion from {@link NexmarkOptions}. */ +public class NexmarkConfigurationTest { + + @Test + public void testNumericQueryParsing() { + NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT; + configuration.overrideFromOptions( + PipelineOptionsFactory.fromArgs("--query=3").as(NexmarkOptions.class)); + assertThat(configuration.query, equalTo(NexmarkQueryName.LOCAL_ITEM_SUGGESTION)); + } + + @Test + public void testNamedQueryParsing() { + NexmarkConfiguration configuration = NexmarkConfiguration.DEFAULT; + configuration.overrideFromOptions( + PipelineOptionsFactory.fromArgs("--query=HIGHEST_BID").as(NexmarkOptions.class)); + assertThat(configuration.query, equalTo(NexmarkQueryName.HIGHEST_BID)); + } +} diff --git a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java index 620f1b3a6db..363c3176ea8 100644 --- a/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java +++ b/sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java @@ -42,7 +42,7 @@ /** Test class for BigQuery sinks. */ public class PerfsToBigQueryTest { - private static final int QUERY = 1; + private static final NexmarkQueryName QUERY = NexmarkQueryName.CURRENCY_CONVERSION; private NexmarkOptions options; private FakeDatasetService fakeDatasetService = new FakeDatasetService(); private FakeJobService fakeJobService = new FakeJobService(); @@ -98,7 +98,7 @@ public void testSavePerfsToBigQuery() throws IOException, InterruptedException { Main.savePerfsToBigQuery( options, perfs, fakeBqServices, new Instant(startTimestampSeconds * 1000L)); - String tableSpec = NexmarkUtils.tableSpec(options, String.valueOf(QUERY), 0L, null); + String tableSpec = NexmarkUtils.tableSpec(options, QUERY.getNumberOrName(), 0L, null); List<TableRow> actualRows = fakeDatasetService.getAllRows( options.getProject(), ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 159892) Time Spent: 2h 50m (was: 2h 40m) > Nexmark test of joining stream to files > --------------------------------------- > > Key: BEAM-5817 > URL: https://issues.apache.org/jira/browse/BEAM-5817 > Project: Beam > Issue Type: New Feature > Components: examples-nexmark > Reporter: Kenneth Knowles > Assignee: Kenneth Knowles > Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Nexmark is a convenient framework for testing the use case of large scale > stream enrichment. One way is joining a stream to files, and it can be tested > via any source that Nexmark supports. -- This message was sent by Atlassian JIRA (v7.6.3#76005)