[ https://issues.apache.org/jira/browse/BEAM-5817?focusedWorklogId=162107&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162107 ]
ASF GitHub Bot logged work on BEAM-5817: ---------------------------------------- Author: ASF GitHub Bot Created on: 02/Nov/18 21:33 Start Date: 02/Nov/18 21:33 Worklog Time Spent: 10m Work Description: akedin commented on a change in pull request #6905: [BEAM-5817] Add Java only BoundedSideInputJoin benchmark to Nexmark URL: https://github.com/apache/beam/pull/6905#discussion_r230513097 ########## File path: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java ########## @@ -611,6 +631,107 @@ public void processElement(ProcessContext c) { }); } + private static class GenerateSideInputData + extends PTransform<PBegin, PCollection<KV<Long, String>>> { + + private final NexmarkConfiguration config; + + private GenerateSideInputData(NexmarkConfiguration config) { + this.config = config; + } + + @Override + public PCollection<KV<Long, String>> expand(PBegin input) { + return input + .apply(GenerateSequence.from(0).to(config.sideInputRowCount)) + .apply( + MapElements.via( + new SimpleFunction<Long, KV<Long, String>>() { + @Override + public KV<Long, String> apply(Long input) { + return KV.of(input, String.valueOf(input)); + } + })); + } + } + + /** + * Write data to be read as a side input. + * + * <p>Contains pairs of a number and its string representation to model lookups of some enrichment + * data by id. + * + * <p>Generated data covers the range {@code [0, sideInputRowCount)} so lookup joins on any + * desired id field can be modeled by looking up {@code id % sideInputRowCount}. + */ + public static PCollection<KV<Long, String>> prepareSideInput( + Pipeline queryPipeline, NexmarkConfiguration config) { + + checkArgument( + config.sideInputRowCount > 0, "Side input required but sideInputRowCount is not >0"); + + PTransform<PBegin, PCollection<KV<Long, String>>> generateSideInputData = + new GenerateSideInputData(config); + + switch (config.sideInputType) { + case DIRECT: + return queryPipeline.apply(generateSideInputData); + case CSV: + checkArgument( + config.sideInputUrl != null, + "Side input type %s requires a URL but sideInputUrl not specified", + SideInputType.CSV.toString()); + + checkArgument( + config.sideInputNumShards > 0, + "Side input type %s requires explicit numShards but sideInputNumShards not specified", + SideInputType.CSV.toString()); + + Pipeline tempPipeline = Pipeline.create(); + tempPipeline + .apply(generateSideInputData) + .apply( + MapElements.via( + new SimpleFunction<KV<Long, String>, String>( + kv -> String.format("%s,%s", kv.getKey(), kv.getValue())) {})) + .apply(TextIO.write().withNumShards(config.sideInputNumShards).to(config.sideInputUrl)); + tempPipeline.run().waitUntilFinish(); + + return queryPipeline + .apply(TextIO.read().from(config.sideInputUrl + "*")) + .apply( + MapElements.via( + new SimpleFunction<String, KV<Long, String>>( + line -> { + List<String> cols = ImmutableList.copyOf(Splitter.on(",").split(line)); + return KV.of(Long.valueOf(cols.get(0)), cols.get(1)); + }) {})); + default: + throw new IllegalArgumentException( + String.format("Unknown type of %s requested", SideInputType.class.getSimpleName())); Review comment: "Unknown type of side input (%s) requested" ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 162107) Time Spent: 3.5h (was: 3h 20m) > Nexmark test of joining stream to files > --------------------------------------- > > Key: BEAM-5817 > URL: https://issues.apache.org/jira/browse/BEAM-5817 > Project: Beam > Issue Type: New Feature > Components: examples-nexmark > Reporter: Kenneth Knowles > Assignee: Kenneth Knowles > Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Nexmark is a convenient framework for testing the use case of large scale > stream enrichment. One way is joining a stream to files, and it can be tested > via any source that Nexmark supports. -- This message was sent by Atlassian JIRA (v7.6.3#76005)