Hey,
I have a question regarding DataStream created from multiple files in s3. I
have several files in AWS s3, say the path is s3://files/, and then there
are several folders for different days, so in the end the full paths look
like : s3://files/day=1/file.parquet, s3://files/day=2/file.parquet. I
wanted to read all the files and sort them via some specific value.

I thought that I could use the fact that the Long.MAX watermark is
generated, so I've decided to use event time window of size larger than the
data in files.

So, I have something like:

val inputFormat =new ParquetAvroInputFormat[TestData](new Path(
  ("s3a://files/")))
inputFormat.setNestedFileEnumeration(true)
val ste = StreamExecutionEnvironment.createLocalEnvironment(1)
ste.createInput(inputFormat)
  .assignTimestampsAndWatermarks(
     new OutOfOrdernessWatermarkStrategy[TestData](3000, _.getTimestamp))
  .keyBy(_.getKey)
  .timeWindow(Time.days(90))
  .sideOutputLateData(sideOutput)
  .process(new ProcessWindowFunction[TestData, TestData, String, TimeWindow] {
    override def process(key: String, context: Context,
                         elements: Iterable[TestData],
                         out: Collector[TestData]): Unit = {
      println("Processing: " + elements.toList.size + " for key:" + key)
      elements.toSeq.sortBy(_.getTimestamp)
        .foreach(out.collect(_))
    }
  })






*ParquetAvroInputFormat and OutOfOrdernessWatermarkStrategy are my classes
but those do not cause the issue described here.*
The data in files is kept for 30 days, so there is no way that window will
be closed before the files are closed and *Long.Max* timestamp generated.

Now, the problem I am observing is that I would expect to see one message
printed per key, since the parallelism is one. But for some reason I am
observing that for some of the keys(most of them really) there are two
windows created*. *I have  30 unique keys and each key contains around 1M
records. And The output I can see is more or less like that:

1. Several messages about Switching to Random IO seek policy
2. Print for most of the keys present in the dataset (but the counts are
quite small, most of them around 100k, some as small as few hundred)
3. More Switching to Random IO seek policy
4. Print again for some keys, but now the counts are much higher.

So, the total count of all processed values is correct. It's just I am
interested why the window gets invoked twice.

Thanks in advance,
Best Regards,
Dom.

Reply via email to