lukecwik commented on a change in pull request #12924:
URL: https://github.com/apache/beam/pull/12924#discussion_r498425569



##########
File path: 
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -637,4 +619,68 @@ private CreateTextSourceFn(byte[] delimiter, boolean 
hasMultilineCSVRecords) {
 
   /** Disable construction of utility class. */
   private ContextualTextIO() {}
+
+  private static class ProcessRecordNumbers extends 
PTransform<PCollection<Row>, PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollection<Row> records) {
+      /*
+       * At this point the line number in RecordWithMetadata contains the 
relative line offset from the beginning of the read range.
+       *
+       * To compute the absolute position from the beginning of the input we 
group the lines within the same ranges, and evaluate the size of each range.
+       */
+
+      // This algorithm only works with triggers that fire once, for now only 
default trigger is
+      // supported.
+      Trigger currentTrigger = records.getWindowingStrategy().getTrigger();
+
+      Set<Trigger> allowedTriggers =
+          ImmutableSet.of(
+              Repeatedly.forever(AfterWatermark.pastEndOfWindow()), 
DefaultTrigger.of());
+
+      Preconditions.checkArgument(
+          allowedTriggers.contains(currentTrigger),
+          String.format(
+              "getWithRecordNumMetadata only support the default trigger not. 
%s", currentTrigger));

Review comment:
       ```suggestion
                 "getWithRecordNumMetadata(true) only supports the default 
trigger not: %s", currentTrigger));
   ```

##########
File path: 
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -167,26 +177,26 @@
  *      .apply(ContextualTextIO.readFiles());
  * }</pre>
  *
- * <p>Example 6: reading without recordNum metadata, or only fileName 
associated Metadata. (the
- * Objects would still contain recordNums, but these recordNums would 
correspond to their positions
- * in their respective offsets rather than their positions within the entire 
file).
+ * <p>Example 6: reading with recordNum metadata. (the Objects still contain 
recordNums, but these
+ * recordNums would correspond to their positions in their respective offsets 
rather than their
+ * positions within the entire file).
  *
  * <pre>{@code
  * Pipeline p = ...;
  *
  * PCollection<Row> records = p.apply(ContextualTextIO.read()
  *     .from("/local/path/to/files/*.csv")
- *      .setWithoutRecordNumMetadata(true));
+ *      .setWithRecordNumMetadata(true));
  * }</pre>
  *
  * <p>NOTE: When using {@link 
ContextualTextIO.Read#withHasMultilineCSVRecords(Boolean)} this
  * option, a single reader will be used to process the file, rather than 
multiple readers which can
  * read from different offsets. For a large file this can result in lower 
performance.
  *
- * <p>NOTE: Use {@link Read#withoutRecordNumMetadata()} when recordNum 
metadata is not required, for
- * example, when when only filename metadata is required. Computing record 
positions currently
- * introduces a shuffle step, which increases the resources used by the 
pipeline. <b> By default
- * withoutRecordNumMetadata is set to false, so the shuffle step is 
performed.</b>
+ * <p>NOTE: Use {@link Read#withRecordNumMetadata()} when recordNum metadata 
is required. Computing
+ * record positions currently introduces a shuffle step, which increases the 
resources used by the

Review comment:
       ```suggestion
    * record positions currently introduces a grouping step, which increases 
the resources used by the
   ```

##########
File path: 
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -637,4 +619,68 @@ private CreateTextSourceFn(byte[] delimiter, boolean 
hasMultilineCSVRecords) {
 
   /** Disable construction of utility class. */
   private ContextualTextIO() {}
+
+  private static class ProcessRecordNumbers extends 
PTransform<PCollection<Row>, PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollection<Row> records) {
+      /*
+       * At this point the line number in RecordWithMetadata contains the 
relative line offset from the beginning of the read range.
+       *
+       * To compute the absolute position from the beginning of the input we 
group the lines within the same ranges, and evaluate the size of each range.
+       */
+
+      // This algorithm only works with triggers that fire once, for now only 
default trigger is
+      // supported.
+      Trigger currentTrigger = records.getWindowingStrategy().getTrigger();
+
+      Set<Trigger> allowedTriggers =
+          ImmutableSet.of(
+              Repeatedly.forever(AfterWatermark.pastEndOfWindow()), 
DefaultTrigger.of());
+
+      Preconditions.checkArgument(
+          allowedTriggers.contains(currentTrigger),
+          String.format(
+              "getWithRecordNumMetadata only support the default trigger not. 
%s", currentTrigger));
+
+      PCollection<KV<KV<String, Long>, Row>> recordsGroupedByFileAndRange =
+          records
+              .apply("AddFileNameAndRange", ParDo.of(new 
AddFileNameAndRange()))
+              .setCoder(
+                  KvCoder.of(
+                      KvCoder.of(StringUtf8Coder.of(), 
BigEndianLongCoder.of()),
+                      RowCoder.of(RecordWithMetadata.getSchema())));
+
+      PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+          recordsGroupedByFileAndRange
+              .apply("CountRecordsForEachFileRange", Count.perKey())
+              .apply("SizesAsView", View.asMap());
+
+      // Get Pipeline to create a dummy PCollection with one element to help 
compute the lines
+      // before each Range
+      PCollection<Integer> singletonPcoll =
+          records.getPipeline().apply("CreateSingletonPcoll", 
Create.of(Arrays.asList(1)));
+
+      /*
+       * For each (File, Offset) pair, calculate the number of lines occurring 
before the Range for each file
+       *
+       * After computing the number of lines before each range, we can find 
the line number in original file as numLiesBeforeOffset + lineNumInCurrentOffset

Review comment:
       ```suggestion
          * After computing the number of lines before each range, we can find 
the line number in original file as numLinesBeforeOffset + 
lineNumInCurrentOffset
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to