At a high level, the pipeline looks something like this:
pipeline
.apply("read kafka",
KafkaIO.readBytes().updateConsumerProperties({"auto.offset.reset":
"earliest"}))
.apply("xform", MapElements.via(...))
.apply("window", Window.into(FixedWindows.of(Duration.standardDays(1)))
.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardHours(1)))
.apply("write",
FileIO.write().to("s3://...").withNumShards(1).withCompression(Compression.GZIP))
Mike.
Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure your life.
On Mon, Jun 10, 2019 at 1:47 PM Ankur Goenka <[email protected]> wrote:
> Hi Mike, This can be because of the partitioning logic of the data.
>
> If possible, can you share your pipeline code at a high level.
>
> On Mon, Jun 10, 2019 at 12:58 PM Mike Kaplinskiy <[email protected]>
> wrote:
>
>>
>> Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure your
>> life.
>>
>>
>> On Mon, Jun 10, 2019 at 6:51 AM Maximilian Michels <[email protected]>
>> wrote:
>>
>>> Hi Mike,
>>>
>>> If you set the number of shards to 1, you should get one shard per
>>> window; unless you have "ignore windows" set to true.
>>>
>>
>> Right, that makes sense, and what I expected. The thing that I find a
>> little puzzling is that a single Flink sub-task receives all of the data -
>> I'd expect the actual work to be spread across runners since each window is
>> independent.
>>
>>
>>>
>>> > (The way I'm checking this is via the Flink UI)
>>>
>>> I'm curious, how do you check this via the Flink UI?
>>>
>>
>> Attached a screenshot
>>
>>
>>>
>>> Cheers,
>>> Max
>>>
>>> On 09.06.19 22:33, Mike Kaplinskiy wrote:
>>> > Hi everyone,
>>> >
>>> > I’m using a Kafka source with a lot of watermark skew (i.e. new
>>> > partitions were added to the topic over time). The sink is a
>>> > FileIO.Write().withNumShards(1) to get ~ 1 file per day & an early
>>> > trigger to write at most 40,000 records per file. Unfortunately it
>>> looks
>>> > like there's only 1 shard trying to write files for all the various
>>> days
>>> > instead of writing multiple days' files in parallel. (The way I'm
>>> > checking this is via the Flink UI). Is there anything I could do here
>>> to
>>> > parallelize the process? All of this is with the Flink runner.
>>> >
>>> > Mike.
>>> >
>>> > Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure your
>>> life.
>>> >
>>>
>>>