Yes data will be keyed by shard.

This is the trigger config we used:

WindowFileNamePolicy policy = new 
WindowFileNamePolicy(prefix,options.getDataSource());

TextIO.Write textWriter = TextIO.write()
                .to(policy)
                .withTempDirectory(tempPrefix)
                .withWindowedWrites()
                .withNumShards(options.getShardCount());

  batchCollection = batchCollection.apply("Fixed Strategy",Window.<String>into(
                    
FixedWindows.of(Utilities.resolveDuration(options.getWindowDuration())))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    
.withAllowedLateness(Utilities.resolveDuration(options.getWindowLateness()))
                    .discardingFiredPanes()).apply(textWriter);



From: Kenneth Knowles <k...@apache.org>
Sent: Friday, May 1, 2020 4:19 PM
To: user <user@beam.apache.org>
Subject: Re: Notifying the closure of a Window Period

CAUTION: This email originated from outside of D&B. Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

Is the data keyed by shardNumber? To have a unique final pane for a filename 
prefix, you will need to include the key in the prefix.

Can you also provide the triggering configuration you are working with?

Kenn

On Fri, May 1, 2020 at 6:47 AM Truebody, Kyle 
<truebo...@dnb.com<mailto:truebo...@dnb.com>> wrote:
Hi Kenn,

Thanks for the response…
Not sure if  I under this correctly : ‘affected by the fact that windows 
processed independently for each key’
I put a high level example below, hope it clarifies what I am trying to ask.
Is there more precise way we can get informed of the final pane of a window 
session has been written completely.
Due to nature of coordination set up for downstream consumers, the .trigger 
file delivery needs to be on the completion of the absolute last pane.

```
public class WindowFileNamePolicy extends FileBasedSink.FilenamePolicy  {

    private final ResourceId prefix;

    private final String dataSource;

    /**
     *  file names - file source name
     *             - timestamp (processing timestamp / event timestamp) Based 
on the current time window
     *             - optional : - shard number
     *                          - window start ts
     * @param prefix
     */
    public WindowFileNamePolicy(ResourceId prefix,String dataSource){
        this.prefix = prefix;
        this.dataSource  = dataSource;
    }


    public String filenamePrefixForWindow(IntervalWindow window) {
        String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();

        DateTimeFormatter formatter = 
DateTimeFormat.forPattern(Utilities.lngDateFormat);
        DateTime windowStart = 
formatter.parseDateTime(window.start().toString());

        DateTimeFormatter resultformat = 
DateTimeFormat.forPattern(Utilities.shtDateFormat);

        return String.format(
                "%s/%s/%s-%s", resultformat.print(windowStart), dataSource, 
dataSource, resultformat.print(windowStart));
    }

    @Override
    public ResourceId windowedFilename(int shardNumber, int numShards, 
BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints 
outputFileHints) {
        IntervalWindow intervalWindow = (IntervalWindow) window;
        String filename =
                String.format(
                        "%s-%s-%s",
                        filenamePrefixForWindow(intervalWindow),
                        shardNumber,
                        numShards);

        if(paneInfo.isLast())
            createTriggerFile(/*tigger file name*/ ".trigger");  //writes to 
the same directory of the current window.  This fires multiple time depending 
on the number of panes that have isLast() is true/ or write operators (not sure 
exactly).
        return prefix.getCurrentDirectory().resolve(filename, 
ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
    }
```

Thanks,
Kyle

From: Kenneth Knowles <k...@apache.org<mailto:k...@apache.org>>
Sent: Friday, May 1, 2020 2:25 PM
To: user <user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Notifying the closure of a Window Period

CAUTION: This email originated from outside of D&B. Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

I am guessing you will be affected by the fact that windows processed 
independently for each key. Is that what you are referring to when you mention 
multiple isLast() windows?

Kenn

On Fri, May 1, 2020 at 3:36 AM Truebody, Kyle 
<truebo...@dnb.com<mailto:truebo...@dnb.com>> wrote:
Hi all,

We are working on a streaming pipeline that we need to compatible with out 
legacy platform while we make the move over to Beam Streaming.
Our legacy platform uses a co-ordination framework (oozie). Each step is in the 
coordination pipeline is active by the creation of  a trigger file.

I am looking for a beam construct or flag that will notify the Context/ driver 
of the closure of a Time window. We need to enable to create a trigger flag 
only when all the files have been emitted
from set window period.

We have tried creating the trigger flag using the  PaneInfo.isLast() through a 
custom WindowFileNamePolicy. Noticed that a window has multiple Panes that will 
have isLast() as true.

Thanks,
Kyle

Reply via email to