[ 
https://issues.apache.org/jira/browse/BEAM-5817?focusedWorklogId=162107&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-162107
 ]

ASF GitHub Bot logged work on BEAM-5817:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Nov/18 21:33
            Start Date: 02/Nov/18 21:33
    Worklog Time Spent: 10m 
      Work Description: akedin commented on a change in pull request #6905: 
[BEAM-5817] Add Java only BoundedSideInputJoin benchmark to Nexmark
URL: https://github.com/apache/beam/pull/6905#discussion_r230513097
 
 

 ##########
 File path: 
sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
 ##########
 @@ -611,6 +631,107 @@ public void processElement(ProcessContext c) {
         });
   }
 
+  private static class GenerateSideInputData
+      extends PTransform<PBegin, PCollection<KV<Long, String>>> {
+
+    private final NexmarkConfiguration config;
+
+    private GenerateSideInputData(NexmarkConfiguration config) {
+      this.config = config;
+    }
+
+    @Override
+    public PCollection<KV<Long, String>> expand(PBegin input) {
+      return input
+          .apply(GenerateSequence.from(0).to(config.sideInputRowCount))
+          .apply(
+              MapElements.via(
+                  new SimpleFunction<Long, KV<Long, String>>() {
+                    @Override
+                    public KV<Long, String> apply(Long input) {
+                      return KV.of(input, String.valueOf(input));
+                    }
+                  }));
+    }
+  }
+
+  /**
+   * Write data to be read as a side input.
+   *
+   * <p>Contains pairs of a number and its string representation to model 
lookups of some enrichment
+   * data by id.
+   *
+   * <p>Generated data covers the range {@code [0, sideInputRowCount)} so 
lookup joins on any
+   * desired id field can be modeled by looking up {@code id % 
sideInputRowCount}.
+   */
+  public static PCollection<KV<Long, String>> prepareSideInput(
+      Pipeline queryPipeline, NexmarkConfiguration config) {
+
+    checkArgument(
+        config.sideInputRowCount > 0, "Side input required but 
sideInputRowCount is not >0");
+
+    PTransform<PBegin, PCollection<KV<Long, String>>> generateSideInputData =
+        new GenerateSideInputData(config);
+
+    switch (config.sideInputType) {
+      case DIRECT:
+        return queryPipeline.apply(generateSideInputData);
+      case CSV:
+        checkArgument(
+            config.sideInputUrl != null,
+            "Side input type %s requires a URL but sideInputUrl not specified",
+            SideInputType.CSV.toString());
+
+        checkArgument(
+            config.sideInputNumShards > 0,
+            "Side input type %s requires explicit numShards but 
sideInputNumShards not specified",
+            SideInputType.CSV.toString());
+
+        Pipeline tempPipeline = Pipeline.create();
+        tempPipeline
+            .apply(generateSideInputData)
+            .apply(
+                MapElements.via(
+                    new SimpleFunction<KV<Long, String>, String>(
+                        kv -> String.format("%s,%s", kv.getKey(), 
kv.getValue())) {}))
+            
.apply(TextIO.write().withNumShards(config.sideInputNumShards).to(config.sideInputUrl));
+        tempPipeline.run().waitUntilFinish();
+
+        return queryPipeline
+            .apply(TextIO.read().from(config.sideInputUrl + "*"))
+            .apply(
+                MapElements.via(
+                    new SimpleFunction<String, KV<Long, String>>(
+                        line -> {
+                          List<String> cols = 
ImmutableList.copyOf(Splitter.on(",").split(line));
+                          return KV.of(Long.valueOf(cols.get(0)), cols.get(1));
+                        }) {}));
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown type of %s requested", 
SideInputType.class.getSimpleName()));
 
 Review comment:
   "Unknown type of side input (%s) requested" ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 162107)
    Time Spent: 3.5h  (was: 3h 20m)

> Nexmark test of joining stream to files
> ---------------------------------------
>
>                 Key: BEAM-5817
>                 URL: https://issues.apache.org/jira/browse/BEAM-5817
>             Project: Beam
>          Issue Type: New Feature
>          Components: examples-nexmark
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Nexmark is a convenient framework for testing the use case of large scale 
> stream enrichment. One way is joining a stream to files, and it can be tested 
> via any source that Nexmark supports.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to