Yes data will be keyed by shard. This is the trigger config we used:
WindowFileNamePolicy policy = new WindowFileNamePolicy(prefix,options.getDataSource()); TextIO.Write textWriter = TextIO.write() .to(policy) .withTempDirectory(tempPrefix) .withWindowedWrites() .withNumShards(options.getShardCount()); batchCollection = batchCollection.apply("Fixed Strategy",Window.<String>into( FixedWindows.of(Utilities.resolveDuration(options.getWindowDuration()))) .triggering(AfterWatermark.pastEndOfWindow()) .withAllowedLateness(Utilities.resolveDuration(options.getWindowLateness())) .discardingFiredPanes()).apply(textWriter); From: Kenneth Knowles <k...@apache.org> Sent: Friday, May 1, 2020 4:19 PM To: user <user@beam.apache.org> Subject: Re: Notifying the closure of a Window Period CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe. Is the data keyed by shardNumber? To have a unique final pane for a filename prefix, you will need to include the key in the prefix. Can you also provide the triggering configuration you are working with? Kenn On Fri, May 1, 2020 at 6:47 AM Truebody, Kyle <truebo...@dnb.com<mailto:truebo...@dnb.com>> wrote: Hi Kenn, Thanks for the response… Not sure if I under this correctly : ‘affected by the fact that windows processed independently for each key’ I put a high level example below, hope it clarifies what I am trying to ask. Is there more precise way we can get informed of the final pane of a window session has been written completely. Due to nature of coordination set up for downstream consumers, the .trigger file delivery needs to be on the completion of the absolute last pane. ``` public class WindowFileNamePolicy extends FileBasedSink.FilenamePolicy { private final ResourceId prefix; private final String dataSource; /** * file names - file source name * - timestamp (processing timestamp / event timestamp) Based on the current time window * - optional : - shard number * - window start ts * @param prefix */ public WindowFileNamePolicy(ResourceId prefix,String dataSource){ this.prefix = prefix; this.dataSource = dataSource; } public String filenamePrefixForWindow(IntervalWindow window) { String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename(); DateTimeFormatter formatter = DateTimeFormat.forPattern(Utilities.lngDateFormat); DateTime windowStart = formatter.parseDateTime(window.start().toString()); DateTimeFormatter resultformat = DateTimeFormat.forPattern(Utilities.shtDateFormat); return String.format( "%s/%s/%s-%s", resultformat.print(windowStart), dataSource, dataSource, resultformat.print(windowStart)); } @Override public ResourceId windowedFilename(int shardNumber, int numShards, BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints outputFileHints) { IntervalWindow intervalWindow = (IntervalWindow) window; String filename = String.format( "%s-%s-%s", filenamePrefixForWindow(intervalWindow), shardNumber, numShards); if(paneInfo.isLast()) createTriggerFile(/*tigger file name*/ ".trigger"); //writes to the same directory of the current window. This fires multiple time depending on the number of panes that have isLast() is true/ or write operators (not sure exactly). return prefix.getCurrentDirectory().resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE); } ``` Thanks, Kyle From: Kenneth Knowles <k...@apache.org<mailto:k...@apache.org>> Sent: Friday, May 1, 2020 2:25 PM To: user <user@beam.apache.org<mailto:user@beam.apache.org>> Subject: Re: Notifying the closure of a Window Period CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe. I am guessing you will be affected by the fact that windows processed independently for each key. Is that what you are referring to when you mention multiple isLast() windows? Kenn On Fri, May 1, 2020 at 3:36 AM Truebody, Kyle <truebo...@dnb.com<mailto:truebo...@dnb.com>> wrote: Hi all, We are working on a streaming pipeline that we need to compatible with out legacy platform while we make the move over to Beam Streaming. Our legacy platform uses a co-ordination framework (oozie). Each step is in the coordination pipeline is active by the creation of a trigger file. I am looking for a beam construct or flag that will notify the Context/ driver of the closure of a Time window. We need to enable to create a trigger flag only when all the files have been emitted from set window period. We have tried creating the trigger flag using the PaneInfo.isLast() through a custom WindowFileNamePolicy. Noticed that a window has multiple Panes that will have isLast() as true. Thanks, Kyle