lukecwik commented on a change in pull request #12924: URL: https://github.com/apache/beam/pull/12924#discussion_r498425569
########## File path: sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java ########## @@ -637,4 +619,68 @@ private CreateTextSourceFn(byte[] delimiter, boolean hasMultilineCSVRecords) { /** Disable construction of utility class. */ private ContextualTextIO() {} + + private static class ProcessRecordNumbers extends PTransform<PCollection<Row>, PCollection<Row>> { + + @Override + public PCollection<Row> expand(PCollection<Row> records) { + /* + * At this point the line number in RecordWithMetadata contains the relative line offset from the beginning of the read range. + * + * To compute the absolute position from the beginning of the input we group the lines within the same ranges, and evaluate the size of each range. + */ + + // This algorithm only works with triggers that fire once, for now only default trigger is + // supported. + Trigger currentTrigger = records.getWindowingStrategy().getTrigger(); + + Set<Trigger> allowedTriggers = + ImmutableSet.of( + Repeatedly.forever(AfterWatermark.pastEndOfWindow()), DefaultTrigger.of()); + + Preconditions.checkArgument( + allowedTriggers.contains(currentTrigger), + String.format( + "getWithRecordNumMetadata only support the default trigger not. %s", currentTrigger)); Review comment: ```suggestion "getWithRecordNumMetadata(true) only supports the default trigger not: %s", currentTrigger)); ``` ########## File path: sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java ########## @@ -167,26 +177,26 @@ * .apply(ContextualTextIO.readFiles()); * }</pre> * - * <p>Example 6: reading without recordNum metadata, or only fileName associated Metadata. (the - * Objects would still contain recordNums, but these recordNums would correspond to their positions - * in their respective offsets rather than their positions within the entire file). + * <p>Example 6: reading with recordNum metadata. (the Objects still contain recordNums, but these + * recordNums would correspond to their positions in their respective offsets rather than their + * positions within the entire file). * * <pre>{@code * Pipeline p = ...; * * PCollection<Row> records = p.apply(ContextualTextIO.read() * .from("/local/path/to/files/*.csv") - * .setWithoutRecordNumMetadata(true)); + * .setWithRecordNumMetadata(true)); * }</pre> * * <p>NOTE: When using {@link ContextualTextIO.Read#withHasMultilineCSVRecords(Boolean)} this * option, a single reader will be used to process the file, rather than multiple readers which can * read from different offsets. For a large file this can result in lower performance. * - * <p>NOTE: Use {@link Read#withoutRecordNumMetadata()} when recordNum metadata is not required, for - * example, when when only filename metadata is required. Computing record positions currently - * introduces a shuffle step, which increases the resources used by the pipeline. <b> By default - * withoutRecordNumMetadata is set to false, so the shuffle step is performed.</b> + * <p>NOTE: Use {@link Read#withRecordNumMetadata()} when recordNum metadata is required. Computing + * record positions currently introduces a shuffle step, which increases the resources used by the Review comment: ```suggestion * record positions currently introduces a grouping step, which increases the resources used by the ``` ########## File path: sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java ########## @@ -637,4 +619,68 @@ private CreateTextSourceFn(byte[] delimiter, boolean hasMultilineCSVRecords) { /** Disable construction of utility class. */ private ContextualTextIO() {} + + private static class ProcessRecordNumbers extends PTransform<PCollection<Row>, PCollection<Row>> { + + @Override + public PCollection<Row> expand(PCollection<Row> records) { + /* + * At this point the line number in RecordWithMetadata contains the relative line offset from the beginning of the read range. + * + * To compute the absolute position from the beginning of the input we group the lines within the same ranges, and evaluate the size of each range. + */ + + // This algorithm only works with triggers that fire once, for now only default trigger is + // supported. + Trigger currentTrigger = records.getWindowingStrategy().getTrigger(); + + Set<Trigger> allowedTriggers = + ImmutableSet.of( + Repeatedly.forever(AfterWatermark.pastEndOfWindow()), DefaultTrigger.of()); + + Preconditions.checkArgument( + allowedTriggers.contains(currentTrigger), + String.format( + "getWithRecordNumMetadata only support the default trigger not. %s", currentTrigger)); + + PCollection<KV<KV<String, Long>, Row>> recordsGroupedByFileAndRange = + records + .apply("AddFileNameAndRange", ParDo.of(new AddFileNameAndRange())) + .setCoder( + KvCoder.of( + KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of()), + RowCoder.of(RecordWithMetadata.getSchema()))); + + PCollectionView<Map<KV<String, Long>, Long>> rangeSizes = + recordsGroupedByFileAndRange + .apply("CountRecordsForEachFileRange", Count.perKey()) + .apply("SizesAsView", View.asMap()); + + // Get Pipeline to create a dummy PCollection with one element to help compute the lines + // before each Range + PCollection<Integer> singletonPcoll = + records.getPipeline().apply("CreateSingletonPcoll", Create.of(Arrays.asList(1))); + + /* + * For each (File, Offset) pair, calculate the number of lines occurring before the Range for each file + * + * After computing the number of lines before each range, we can find the line number in original file as numLiesBeforeOffset + lineNumInCurrentOffset Review comment: ```suggestion * After computing the number of lines before each range, we can find the line number in original file as numLinesBeforeOffset + lineNumInCurrentOffset ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org