Hi,

We have written a dataflow pipeline which reads a file from GCS using 
ContextualTextIO.read() method with "RecordNumMetadata" enabled since we 
require line number of the record as well. Apache beam version used is 2.38.

We are looking at modifying the pipeline to continously watch for new files in 
GCS so that it can be processed immediately when copied to the location as per 
the documentation,
https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.html

'''
Example 3: streaming new files matching a filepattern.


 Pipeline p = ...;

 PCollection<Row> records = p.apply(ContextualTextIO.read()
     .from("/local/path/to/files/*")
     .watchForNewFiles(
       // Check for new files every minute
       Duration.standardMinutes(1),
       // Stop watching the filepattern if no new files appear within an hour
       afterTimeSinceNewOutput(Duration.standardHours(1))));
'''


But there is no "watchForNewFiles" method available with 
ContextualTextIO.read() method.

Currently we are trying out with below code changes to see if streaming of new 
files using file pattern works with ContextualTextIO API

pipeline.apply(ContextualTextIO.read().from(options.getFilePattern)
  
.withMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW).continuously(Duration.standardMinutes(20),
 Growth.never()))
  .withRecordNumMetadata())

But, we are getting below error when creating the dataflow template

Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot 
be applied to non-bounded PCollection in the GlobalWindow without a trigger. 
Use a Window.into or Window.triggering transform prior to GroupByKey.
      at 
org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:155)
      at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:212)
      at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:109)
      at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
      at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
      at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:363)
      at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1598)
      at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1487)
      at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
      at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
      at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:376)
      at 
org.apache.beam.sdk.io.contextualtextio.ContextualTextIO$ProcessRecordNumbers.expand(ContextualTextIO.java:693)
      at 
org.apache.beam.sdk.io.contextualtextio.ContextualTextIO$ProcessRecordNumbers.expand(ContextualTextIO.java:659)
      at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
      at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
      at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:363)
      at 
org.apache.beam.sdk.io.contextualtextio.ContextualTextIO$Read.expand(ContextualTextIO.java:408)
      at 
org.apache.beam.sdk.io.contextualtextio.ContextualTextIO$Read.expand(ContextualTextIO.java:248)
      at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
      at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
      at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
      at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:177)


Is there anyway to achieve streaming of files using ContextualTextIO.read() API?

Thanks in advance,

Sasi
Geschäftsanschrift/Business address: METRO Digital GmbH, Metro-Straße 12, 40235 
Duesseldorf, Germany Geschäftsführung/Management Board: Timo Salzsieder 
(Vorsitzender/CEO), Felix Lindemann (COO), Frank Hammerle (CFO) Sitz 
Düsseldorf, Amtsgericht Düsseldorf, HRB 90755/Registered Office Düsseldorf, 
Commercial Register of the Düsseldorf Local Court, HRB 90755 Betreffend Mails 
von *@metro.digital Die in dieser E-Mail enthaltenen Nachrichten und Anhänge 
sind ausschließlich für den bezeichneten Adressaten bestimmt. Sie können 
rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht der 
bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt sind, ist 
die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten und Anhänge 
untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben, informieren Sie 
bitte unverzüglich den Absender und vernichten Sie die E-Mail. Regarding mails 
from *@metro.digital This e-mail message and any attachment are intended 
exclusively for the named addressee. They may contain confidential information 
which may also be protected by professional secrecy. Unless you are the named 
addressee (or authorised to receive for the addressee) you may not copy or use 
this message or any attachment or disclose the contents to anyone else. If you 
have received this e-mail by mistake, please inform the sender immediately and 
destroy the e-mail.

Reply via email to