Hi Reuven,

This is the code snippet for side input, we dont have window & trigger in
the main business logic PTransforms.

Thanks!
Eleanore

private <T> PCollection<KV<String, T>> createSideCollection(String topicName,
  Class<? extends Deserializer<T>> deserializerClass) {

  Map<String, Object> consumerProperties = ImmutableMap.<String,
Object>builder()
    .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString())
    .build();

  PCollection<KV<String, T>> kafkaValues =
pipeline.apply("collectionFromTopic-" + topicName,
    KafkaIO.<String, T>read()
      .withBootstrapServers(kafkaSettings.getBootstrapServers())
      .withTopics(Collections.singletonList(topicName))
      .withKeyDeserializer(KeyDeserializer.class)
      .withValueDeserializer(deserializerClass)
      .withConsumerConfigUpdates(consumerProperties)
      .withoutMetadata());

  Trigger trigger = Repeatedly.forever(
    AfterFirst.of(
      AfterPane.elementCountAtLeast(1),
      AfterProcessingTime.pastFirstElementInPane()
    ));

  return kafkaValues.apply(Window.<KV<String, T>>into(new GlobalWindows())
    .triggering(trigger)
    .accumulatingFiredPanes()
    .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
  );
}


On Wed, May 13, 2020 at 1:39 PM Reuven Lax <[email protected]> wrote:

> Do you have a window or trigger set up in your pipeline?
>
> On Mon, May 11, 2020 at 2:41 PM Eleanore Jin <[email protected]>
> wrote:
>
>>
>> Hi Max,
>>
>> No I did not introduce RocksDB at this point since the pipeline is
>> stateless apart from Kafka offset.
>>
>> So what we do is to ensure there is a dummy message in the side input to
>> avoid this situation.
>>
>> Thanks!
>> Eleanore
>>
>> On Mon, May 11, 2020 at 2:57 AM Maximilian Michels <[email protected]>
>> wrote:
>>
>>> Generally, it is to be expected that the main input is buffered until
>>> the side input is available. We really have no other option to correctly
>>> process the data.
>>>
>>> Have you tried using RocksDB as the state backend to prevent too much GC
>>> churn?
>>>
>>> -Max
>>>
>>> On 07.05.20 06:27, Eleanore Jin wrote:
>>> > Please see: https://issues.apache.org/jira/browse/BEAM-9914
>>> >
>>> > Thanks a lot!
>>> > Eleanore
>>> >
>>> > On Wed, May 6, 2020 at 9:17 PM Ankur Goenka <[email protected]
>>> > <mailto:[email protected]>> wrote:
>>> >
>>> >     Thanks for sharing the response. It makes sense to me.
>>> >     Please file a jira in Beam so that we can prioritize it.
>>> >
>>> >     Thanks,
>>> >     Ankur
>>> >
>>> >     On Wed, May 6, 2020 at 9:08 PM Eleanore Jin <
>>> [email protected]
>>> >     <mailto:[email protected]>> wrote:
>>> >
>>> >         Hi Ankur,
>>> >
>>> >         Thanks for your response.
>>> >
>>> >         I also checked with Flink Community, here is there response, in
>>> >         short, flink does not cache the main input data if there is no
>>> >         data available in side input  (flink broadcast stream)
>>> >
>>> >         - quote from flink community:
>>> >
>>> >         Coming back to your question, Flink's Broadcast stream does
>>> >         *not* block or collect events from the non-broadcasted side if
>>> >         the broadcast side doesn't serve events.
>>> >         However, the user-implemented operators (Beam or your code in
>>> >         this case) often puts non-broadcasted events into state to wait
>>> >         for input from the other side.
>>> >         Since the error is not about lack of memory, the buffering in
>>> >         Flink state might not be the problem here.
>>> >
>>> >         Thanks a lot for the help!
>>> >         Eleanore
>>> >
>>> >         On Wed, May 6, 2020 at 8:59 PM Ankur Goenka <[email protected]
>>> >         <mailto:[email protected]>> wrote:
>>> >
>>> >             The relevant code should bere
>>> >             here
>>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L595
>>>
>>> >
>>> >             Given that the problem goes away after publishing Side
>>> input
>>> >             suggests that this might be a problem with synchronizing 2
>>> >             streams of data on Flink using Beam.
>>> >
>>> >             I am not sure if flink optimizer waits for site input to be
>>> >             available before processing the main input. We might
>>> >             potentially handle this on the Beam side as well or use a
>>> >             different set of flink apis to let us do better
>>> optimization
>>> >             if possible. In any case this would require a new sdk
>>> >             release if we decide to fix.
>>> >
>>> >             On Wed, May 6, 2020 at 7:54 PM Eleanore Jin
>>> >             <[email protected] <mailto:[email protected]>>
>>> wrote:
>>> >
>>> >                 Hi Ankur,
>>> >
>>> >                 Thanks for the answer! Can you please point to me the
>>> >                 source code where the buffering is? I would like to
>>> >                 learn how beam works, thanks!
>>> >
>>> >                 To your question, in my case, side input does not have
>>> >                 any data, meaning no one publishing to the side input
>>> >                 topic.
>>> >
>>> >                 After publishing some data into the side input topic,
>>> >                 the OOM goes away.
>>> >
>>> >                 Thanks!
>>> >                 Eleanore
>>> >
>>> >                 On Wed, May 6, 2020 at 6:37 PM Ankur Goenka
>>> >                 <[email protected] <mailto:[email protected]>> wrote:
>>> >
>>> >                     Hi Eleanore,
>>> >
>>> >                     The operation requires buffering the data till the
>>> >                     data from side input is not available. Which might
>>> >                     be causing the OOM issue.
>>> >                     You mention that OOM happens when there is no data
>>> >                     in side input. Does it mean that the side input is
>>> >                     not yet ready or does side input have no data at
>>> all?
>>> >
>>> >                     Thanks,
>>> >                     Ankur
>>> >
>>> >                     On Tue, May 5, 2020 at 5:15 PM Pablo Estrada
>>> >                     <[email protected] <mailto:[email protected]>>
>>> wrote:
>>> >
>>> >                         +Ankur Goenka <mailto:[email protected]> by
>>> any
>>> >                         chance do you know what could be causing this?
>>> >
>>> >                         Thanks Eleanore for the detailed debugging : )
>>> >
>>> >                         On Tue, May 5, 2020 at 9:34 AM Eleanore Jin
>>> >                         <[email protected]
>>> >                         <mailto:[email protected]>> wrote:
>>> >
>>> >                             Hi Community,
>>> >
>>> >                             Just wonder does side input feature buffer
>>> >                             the messages from main source if there is
>>> no
>>> >                             data available from side input?
>>> >
>>> >                             Thanks!
>>> >                             Eleanore
>>> >
>>> >                             On Sat, May 2, 2020 at 6:28 PM Eleanore Jin
>>> >                             <[email protected]
>>> >                             <mailto:[email protected]>> wrote:
>>> >
>>> >                                 After some more experience, I observed
>>> >                                 following:
>>> >                                 1. run pipeline without sideinput: no
>>> >                                 OOM issue
>>> >                                 2. run pipeline with sideinput (kafka
>>> >                                 topic with 1 partition) with data
>>> >                                 available from this side input: no OOM
>>> issue
>>> >                                 3. run pipeline with sideinput (kafka
>>> >                                 topic with 1 partition) without any
>>> data
>>> >                                 from the side input: */OOM issue/*
>>> >                                 */
>>> >                                 /*
>>> >                                 So I just wonder what is the behaviour
>>> >                                 if there is no data from the side
>>> input?
>>> >                                 does it cache the data from main
>>> stream?
>>> >                                 If so, is there a way to allow
>>> >                                 processing mainstream data without
>>> >                                 waiting for data from side input?
>>> >
>>> >                                 Thanks a lot!
>>> >                                 Eleanore
>>> >
>>> >                                 On Sat, May 2, 2020 at 1:04 PM Eleanore
>>> >                                 Jin <[email protected]
>>> >                                 <mailto:[email protected]>>
>>> wrote:
>>> >
>>> >                                     Hi Community,
>>> >
>>> >                                     I am running beam(2.16)  with flink
>>> >                                     (1.8.2), in my pipeline there is a
>>> >                                     sideinput which reads from a
>>> compact
>>> >                                     kafka topic from earliest, and the
>>> >                                     sideinput value is used for
>>> >                                     filtering. I keeps on getting the
>>> >                                     OOM: GC overhead limit exceeded.
>>> >
>>> >                                     The side input method looks like
>>> below.
>>> >
>>> >                                     private <T> PCollection<KV<String,
>>> T>> createSideCollection(String topicName,
>>> >                                       Class<? extends Deserializer<T>>
>>> deserializerClass) {
>>> >
>>> >                                       Map<String, Object>
>>> consumerProperties = ImmutableMap.<String, Object>builder()
>>> >
>>>  .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>>> >
>>>  .put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString())
>>> >                                         .build();
>>> >
>>> >                                       PCollection<KV<String, T>>
>>> kafkaValues = pipeline.apply("collectionFromTopic-" + topicName,
>>> >                                         KafkaIO.<String, T>read()
>>> >
>>>  .withBootstrapServers(kafkaSettings.getBootstrapServers())
>>> >
>>>  .withTopics(Collections.singletonList(topicName))
>>> >
>>>  .withKeyDeserializer(KeyDeserializer.class)
>>> >
>>>  .withValueDeserializer(deserializerClass)
>>> >
>>>  .withConsumerConfigUpdates(consumerProperties)
>>> >                                           .withoutMetadata());
>>> >
>>> >                                       Trigger trigger =
>>> Repeatedly.forever(
>>> >                                         AfterFirst.of(
>>> >
>>>  AfterPane.elementCountAtLeast(1),
>>> >
>>>  AfterProcessingTime.pastFirstElementInPane()
>>> >                                         ));
>>> >
>>> >                                       return
>>> kafkaValues.apply(Window.<KV<String, T>>into(new GlobalWindows())
>>> >                                         .triggering(trigger)
>>> >                                         .accumulatingFiredPanes()
>>> >
>>>  .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
>>> >                                       );
>>> >                                     }
>>> >
>>> >                                     we run 2 flink task managers each
>>> with 4 slots. I think the problem lies in
>>> >
>>> >                                     the sideinput after looking into
>>> the heap dump. Also double confirmed disable
>>> >
>>> >                                     the sideinput, the pipeline runs
>>> fine.
>>> >
>>> >                                     Can you please provide some help?
>>> >
>>> >                                     Thanks a lot!
>>> >
>>> >                                     Eleanore
>>> >
>>> >                                     image.png
>>> >
>>>
>>

Reply via email to