You can get the end time of a window from the TimeWindow object which is
passed to the AllWindowFunction. This is basically a window ID / index.
I would go for a custom output sink which writes records to files based on
their timestamp.
IMO, this would be cleaner & easier than implementing the file output into
the window function.



2016-02-04 13:49 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi Radu,
>
>
>
> It is indeed interesting to know how each window could be registered
> separately -  I am not sure it any of the existing mechanisms in Flink
> support this.
>
> I think you need to create your own output sink. It is a bit tricky to
> pass the window sequence number (actually I do  not think such an index is
> kept – but you can create one by yourself). Maybe an easier option is to
> manage the writing of the data yourself in the window function or in a
> custom created evictor. In the window and in the evictor you have access to
> all data and you can create specific files for each window triggered
>
>
>
>
>
>
>
> *From:* Radu Prodan [mailto:raduprod...@gmail.com]
> *Sent:* Thursday, February 04, 2016 11:58 AM
> *To:* user@flink.apache.org
> *Subject:* Re: Flink writeAsCsv
>
>
>
> Hi Marton,
>
>
>
> Thanks to your comment I managed to get it worked. At least it outputs the
> results. However, what I need is to output each window result seperately.
> Now, it outputs the results of parallel working windows (I think) and
> appends the new results to them. For example, If I have parallelism of 10,
> then I will have at most 10 files and each file will grow in size as
> windows continue.
>
> What I want is, to have seperate file for a window. For example, after
> n'th window is computed output it to some file and close the file.
>
>
>
> -best
>
> Radu
>
>
>
> On Thu, Feb 4, 2016 at 11:42 AM Márton Balassi <balassi.mar...@gmail.com>
> wrote:
>
> Hey Radu,
>
>
>
> As you are using the streaming api I assume that you call env.execute() in
> both cases. Is that the case?
>
>
>
> Do you see any errors appearing? My first call would be if your data type
> is not a tuple type then writeAsCsv does not work by default.
>
>
>
> Best,
>
>
>
> Marton
>
>
>
> On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan <raduprod...@gmail.com>
> wrote:
>
> Hi all,
>
>
>
> I am new to flink. I wrote a simple program and I want it to output as csv
> file.
>
>
>
> timeWindowAll(Time.of(3, TimeUnit.MINUTES))
>
> .apply(newFunction1())
>
> .writeAsCsv("file:///user/someuser/Documents/somefile.csv");
>
>
>
> When I change the sink to . print(), it works and outputs some results.
>
> I want it to output the result of every window. However, it outputs
> nothing and the file is not created. Am I missing anything?
>
>
>
> -best
>
> Radu
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to