Do you have a window or trigger set up in your pipeline?

On Mon, May 11, 2020 at 2:41 PM Eleanore Jin <[email protected]> wrote:

>
> Hi Max,
>
> No I did not introduce RocksDB at this point since the pipeline is
> stateless apart from Kafka offset.
>
> So what we do is to ensure there is a dummy message in the side input to
> avoid this situation.
>
> Thanks!
> Eleanore
>
> On Mon, May 11, 2020 at 2:57 AM Maximilian Michels <[email protected]> wrote:
>
>> Generally, it is to be expected that the main input is buffered until
>> the side input is available. We really have no other option to correctly
>> process the data.
>>
>> Have you tried using RocksDB as the state backend to prevent too much GC
>> churn?
>>
>> -Max
>>
>> On 07.05.20 06:27, Eleanore Jin wrote:
>> > Please see: https://issues.apache.org/jira/browse/BEAM-9914
>> >
>> > Thanks a lot!
>> > Eleanore
>> >
>> > On Wed, May 6, 2020 at 9:17 PM Ankur Goenka <[email protected]
>> > <mailto:[email protected]>> wrote:
>> >
>> >     Thanks for sharing the response. It makes sense to me.
>> >     Please file a jira in Beam so that we can prioritize it.
>> >
>> >     Thanks,
>> >     Ankur
>> >
>> >     On Wed, May 6, 2020 at 9:08 PM Eleanore Jin <[email protected]
>> >     <mailto:[email protected]>> wrote:
>> >
>> >         Hi Ankur,
>> >
>> >         Thanks for your response.
>> >
>> >         I also checked with Flink Community, here is there response, in
>> >         short, flink does not cache the main input data if there is no
>> >         data available in side input  (flink broadcast stream)
>> >
>> >         - quote from flink community:
>> >
>> >         Coming back to your question, Flink's Broadcast stream does
>> >         *not* block or collect events from the non-broadcasted side if
>> >         the broadcast side doesn't serve events.
>> >         However, the user-implemented operators (Beam or your code in
>> >         this case) often puts non-broadcasted events into state to wait
>> >         for input from the other side.
>> >         Since the error is not about lack of memory, the buffering in
>> >         Flink state might not be the problem here.
>> >
>> >         Thanks a lot for the help!
>> >         Eleanore
>> >
>> >         On Wed, May 6, 2020 at 8:59 PM Ankur Goenka <[email protected]
>> >         <mailto:[email protected]>> wrote:
>> >
>> >             The relevant code should bere
>> >             here
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L595
>>
>> >
>> >             Given that the problem goes away after publishing Side input
>> >             suggests that this might be a problem with synchronizing 2
>> >             streams of data on Flink using Beam.
>> >
>> >             I am not sure if flink optimizer waits for site input to be
>> >             available before processing the main input. We might
>> >             potentially handle this on the Beam side as well or use a
>> >             different set of flink apis to let us do better optimization
>> >             if possible. In any case this would require a new sdk
>> >             release if we decide to fix.
>> >
>> >             On Wed, May 6, 2020 at 7:54 PM Eleanore Jin
>> >             <[email protected] <mailto:[email protected]>>
>> wrote:
>> >
>> >                 Hi Ankur,
>> >
>> >                 Thanks for the answer! Can you please point to me the
>> >                 source code where the buffering is? I would like to
>> >                 learn how beam works, thanks!
>> >
>> >                 To your question, in my case, side input does not have
>> >                 any data, meaning no one publishing to the side input
>> >                 topic.
>> >
>> >                 After publishing some data into the side input topic,
>> >                 the OOM goes away.
>> >
>> >                 Thanks!
>> >                 Eleanore
>> >
>> >                 On Wed, May 6, 2020 at 6:37 PM Ankur Goenka
>> >                 <[email protected] <mailto:[email protected]>> wrote:
>> >
>> >                     Hi Eleanore,
>> >
>> >                     The operation requires buffering the data till the
>> >                     data from side input is not available. Which might
>> >                     be causing the OOM issue.
>> >                     You mention that OOM happens when there is no data
>> >                     in side input. Does it mean that the side input is
>> >                     not yet ready or does side input have no data at
>> all?
>> >
>> >                     Thanks,
>> >                     Ankur
>> >
>> >                     On Tue, May 5, 2020 at 5:15 PM Pablo Estrada
>> >                     <[email protected] <mailto:[email protected]>>
>> wrote:
>> >
>> >                         +Ankur Goenka <mailto:[email protected]> by any
>> >                         chance do you know what could be causing this?
>> >
>> >                         Thanks Eleanore for the detailed debugging : )
>> >
>> >                         On Tue, May 5, 2020 at 9:34 AM Eleanore Jin
>> >                         <[email protected]
>> >                         <mailto:[email protected]>> wrote:
>> >
>> >                             Hi Community,
>> >
>> >                             Just wonder does side input feature buffer
>> >                             the messages from main source if there is no
>> >                             data available from side input?
>> >
>> >                             Thanks!
>> >                             Eleanore
>> >
>> >                             On Sat, May 2, 2020 at 6:28 PM Eleanore Jin
>> >                             <[email protected]
>> >                             <mailto:[email protected]>> wrote:
>> >
>> >                                 After some more experience, I observed
>> >                                 following:
>> >                                 1. run pipeline without sideinput: no
>> >                                 OOM issue
>> >                                 2. run pipeline with sideinput (kafka
>> >                                 topic with 1 partition) with data
>> >                                 available from this side input: no OOM
>> issue
>> >                                 3. run pipeline with sideinput (kafka
>> >                                 topic with 1 partition) without any data
>> >                                 from the side input: */OOM issue/*
>> >                                 */
>> >                                 /*
>> >                                 So I just wonder what is the behaviour
>> >                                 if there is no data from the side input?
>> >                                 does it cache the data from main stream?
>> >                                 If so, is there a way to allow
>> >                                 processing mainstream data without
>> >                                 waiting for data from side input?
>> >
>> >                                 Thanks a lot!
>> >                                 Eleanore
>> >
>> >                                 On Sat, May 2, 2020 at 1:04 PM Eleanore
>> >                                 Jin <[email protected]
>> >                                 <mailto:[email protected]>> wrote:
>> >
>> >                                     Hi Community,
>> >
>> >                                     I am running beam(2.16)  with flink
>> >                                     (1.8.2), in my pipeline there is a
>> >                                     sideinput which reads from a compact
>> >                                     kafka topic from earliest, and the
>> >                                     sideinput value is used for
>> >                                     filtering. I keeps on getting the
>> >                                     OOM: GC overhead limit exceeded.
>> >
>> >                                     The side input method looks like
>> below.
>> >
>> >                                     private <T> PCollection<KV<String,
>> T>> createSideCollection(String topicName,
>> >                                       Class<? extends Deserializer<T>>
>> deserializerClass) {
>> >
>> >                                       Map<String, Object>
>> consumerProperties = ImmutableMap.<String, Object>builder()
>> >
>>  .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>> >
>>  .put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString())
>> >                                         .build();
>> >
>> >                                       PCollection<KV<String, T>>
>> kafkaValues = pipeline.apply("collectionFromTopic-" + topicName,
>> >                                         KafkaIO.<String, T>read()
>> >
>>  .withBootstrapServers(kafkaSettings.getBootstrapServers())
>> >
>>  .withTopics(Collections.singletonList(topicName))
>> >
>>  .withKeyDeserializer(KeyDeserializer.class)
>> >
>>  .withValueDeserializer(deserializerClass)
>> >
>>  .withConsumerConfigUpdates(consumerProperties)
>> >                                           .withoutMetadata());
>> >
>> >                                       Trigger trigger =
>> Repeatedly.forever(
>> >                                         AfterFirst.of(
>> >
>>  AfterPane.elementCountAtLeast(1),
>> >
>>  AfterProcessingTime.pastFirstElementInPane()
>> >                                         ));
>> >
>> >                                       return
>> kafkaValues.apply(Window.<KV<String, T>>into(new GlobalWindows())
>> >                                         .triggering(trigger)
>> >                                         .accumulatingFiredPanes()
>> >
>>  .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
>> >                                       );
>> >                                     }
>> >
>> >                                     we run 2 flink task managers each
>> with 4 slots. I think the problem lies in
>> >
>> >                                     the sideinput after looking into
>> the heap dump. Also double confirmed disable
>> >
>> >                                     the sideinput, the pipeline runs
>> fine.
>> >
>> >                                     Can you please provide some help?
>> >
>> >                                     Thanks a lot!
>> >
>> >                                     Eleanore
>> >
>> >                                     image.png
>> >
>>
>

Reply via email to