On Tue, May 19, 2020 at 2:02 AM Maximilian Michels wrote:
> > This is still confusing to me - why would the messages be dropped as
> late in this case?
>
> Since you previously mentioned that the bug is due to the pane info
> missing, I just pointed out that the WriteFiles logic is expected to
>
> This is still confusing to me - why would the messages be dropped as late in
> this case?
Since you previously mentioned that the bug is due to the pane info
missing, I just pointed out that the WriteFiles logic is expected to
drop the pane info.
@Jose Would it make sense to file a JIRA and su
Hi Reuven,
I can try to explaining what I guess.
- There is a source which is reading data entries and updating the
watermark.
- Then, data entries are grouped and stored in files.
- The window information of these data entries are used to emit filenames.
Data entries's window and timestamp. Pane
This is still confusing to me - why would the messages be dropped as late
in this case?
On Mon, May 18, 2020 at 6:14 AM Maximilian Michels wrote:
> All runners which use the Beam reference implementation drop the
> PaneInfo for WriteFilesResult#getPerDestinationOutputFilenames(). That's
> why we
Hi,
Many thanks for your responses.
I agree with you, Reuven, source is who should determine if data are late
or not.
Maximilian, I agree with you, as I mentioned in previous emails I saw the
same behavior with Spark, and I guessed the problem was here
https://github.com/apache/beam/blob/6a4ef33
All runners which use the Beam reference implementation drop the
PaneInfo for WriteFilesResult#getPerDestinationOutputFilenames(). That's
why we can observe this behavior not only in Flink but also Spark.
The WriteFilesResult is returned here:
https://github.com/apache/beam/blob/d773f8ca7a4d63d014
Lateness should never be introduced inside a pipeline - generally late data
can only come from a source. If data was not dropped as late earlier in
the pipeline, it should not be dropped after the file write. I suspect that
this is a bug in how the Flink runner handles the Reshuffle transform, but
Hi Jose,
thank you for putting the effort to get example which demonstrate your
problem.
You are using a streaming pipeline and it seems that watermark in
downstream already advanced further, so when your File pane arrives, it is
already late. Since you define that lateness is not tolerated, it i
Hi again,
I have simplify the example to reproduce the data loss. The scenario is the
following:
- TextIO write files.
- getPerDestinationOutputFilenames emits file names
- File names are processed by a aggregator (combine, distinct,
groupbyKey...) with a window **without allowlateness**
- File n
Hi,
I would like to clarify that while TextIO is writing every data are in the
files (shards). The losing happens when file names emitted by
getPerDestinationOutputFilenames are processed by a window.
I have created a pipeline to reproduce the scenario in which some filenames
are loss after the g
Pane info is supposed to be preserved across transforms. If the Fink runner
does not, than I believe that is a bug.
On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek wrote:
> I am using FileIO and I do observe the drop of pane info information on
> Flink runner too. It was mentioned in this thread:
>
I am using FileIO and I do observe the drop of pane info information on
Flink runner too. It was mentioned in this thread:
https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
It is a result of different reshuffle expansion for optimisation reasons.
However, I did not observe a data loss
Ah, I think I see the problem.
It appears that for some reason, the Flink runner loses windowing
information when a Reshuffle is applied. I'm not entirely sure why, because
windowing information should be maintained across a Reshuffle.
Reuven
On Sat, May 9, 2020 at 9:50 AM Jose Manuel wrote:
>
Hi,
I have added some logs to the pipeline as following (you can find the log
function in the Appendix):
//STREAM + processing time.
pipeline.apply(KafkaIO.read())
.apply(...) //mappers, a window and a combine
.apply(logBeforeWrite())
.apply("WriteFiles",
TextIO.writeCustomT
The window information should still be there. Beam propagates windows
through PCollection, and I don't think WriteFiles does anything explicit to
stop that.
Can you try this with the direct runner to see what happens there?
What is your windowing on this PCollection?
Reuven
On Fri, May 8, 2020
I got the same behavior using Spark Runner (with Spark 2.4.3), window
information was missing.
Just to clarify, the combiner after TextIO had different results. In Flink
runner the files names were dropped, and in Spark the combination process
happened twice, duplicating data. I think it is becau
+dev
On Mon, May 4, 2020 at 3:56 AM Jose Manuel wrote:
> Hi guys,
>
> I think I have found something interesting about windowing.
>
> I have a pipeline that gets data from Kafka and writes in HDFS by means of
> TextIO.
> Once written, generated files are combined to apply some custom operations
Hi guys,
I think I have found something interesting about windowing.
I have a pipeline that gets data from Kafka and writes in HDFS by means of
TextIO.
Once written, generated files are combined to apply some custom operations.
However, the combine does not receive data. Following, you can find t
18 matches
Mail list logo