Do you have a window or trigger set up in your pipeline? On Mon, May 11, 2020 at 2:41 PM Eleanore Jin <[email protected]> wrote:
> > Hi Max, > > No I did not introduce RocksDB at this point since the pipeline is > stateless apart from Kafka offset. > > So what we do is to ensure there is a dummy message in the side input to > avoid this situation. > > Thanks! > Eleanore > > On Mon, May 11, 2020 at 2:57 AM Maximilian Michels <[email protected]> wrote: > >> Generally, it is to be expected that the main input is buffered until >> the side input is available. We really have no other option to correctly >> process the data. >> >> Have you tried using RocksDB as the state backend to prevent too much GC >> churn? >> >> -Max >> >> On 07.05.20 06:27, Eleanore Jin wrote: >> > Please see: https://issues.apache.org/jira/browse/BEAM-9914 >> > >> > Thanks a lot! >> > Eleanore >> > >> > On Wed, May 6, 2020 at 9:17 PM Ankur Goenka <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Thanks for sharing the response. It makes sense to me. >> > Please file a jira in Beam so that we can prioritize it. >> > >> > Thanks, >> > Ankur >> > >> > On Wed, May 6, 2020 at 9:08 PM Eleanore Jin <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Hi Ankur, >> > >> > Thanks for your response. >> > >> > I also checked with Flink Community, here is there response, in >> > short, flink does not cache the main input data if there is no >> > data available in side input (flink broadcast stream) >> > >> > - quote from flink community: >> > >> > Coming back to your question, Flink's Broadcast stream does >> > *not* block or collect events from the non-broadcasted side if >> > the broadcast side doesn't serve events. >> > However, the user-implemented operators (Beam or your code in >> > this case) often puts non-broadcasted events into state to wait >> > for input from the other side. >> > Since the error is not about lack of memory, the buffering in >> > Flink state might not be the problem here. >> > >> > Thanks a lot for the help! >> > Eleanore >> > >> > On Wed, May 6, 2020 at 8:59 PM Ankur Goenka <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > The relevant code should bere >> > here >> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L595 >> >> > >> > Given that the problem goes away after publishing Side input >> > suggests that this might be a problem with synchronizing 2 >> > streams of data on Flink using Beam. >> > >> > I am not sure if flink optimizer waits for site input to be >> > available before processing the main input. We might >> > potentially handle this on the Beam side as well or use a >> > different set of flink apis to let us do better optimization >> > if possible. In any case this would require a new sdk >> > release if we decide to fix. >> > >> > On Wed, May 6, 2020 at 7:54 PM Eleanore Jin >> > <[email protected] <mailto:[email protected]>> >> wrote: >> > >> > Hi Ankur, >> > >> > Thanks for the answer! Can you please point to me the >> > source code where the buffering is? I would like to >> > learn how beam works, thanks! >> > >> > To your question, in my case, side input does not have >> > any data, meaning no one publishing to the side input >> > topic. >> > >> > After publishing some data into the side input topic, >> > the OOM goes away. >> > >> > Thanks! >> > Eleanore >> > >> > On Wed, May 6, 2020 at 6:37 PM Ankur Goenka >> > <[email protected] <mailto:[email protected]>> wrote: >> > >> > Hi Eleanore, >> > >> > The operation requires buffering the data till the >> > data from side input is not available. Which might >> > be causing the OOM issue. >> > You mention that OOM happens when there is no data >> > in side input. Does it mean that the side input is >> > not yet ready or does side input have no data at >> all? >> > >> > Thanks, >> > Ankur >> > >> > On Tue, May 5, 2020 at 5:15 PM Pablo Estrada >> > <[email protected] <mailto:[email protected]>> >> wrote: >> > >> > +Ankur Goenka <mailto:[email protected]> by any >> > chance do you know what could be causing this? >> > >> > Thanks Eleanore for the detailed debugging : ) >> > >> > On Tue, May 5, 2020 at 9:34 AM Eleanore Jin >> > <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Hi Community, >> > >> > Just wonder does side input feature buffer >> > the messages from main source if there is no >> > data available from side input? >> > >> > Thanks! >> > Eleanore >> > >> > On Sat, May 2, 2020 at 6:28 PM Eleanore Jin >> > <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > After some more experience, I observed >> > following: >> > 1. run pipeline without sideinput: no >> > OOM issue >> > 2. run pipeline with sideinput (kafka >> > topic with 1 partition) with data >> > available from this side input: no OOM >> issue >> > 3. run pipeline with sideinput (kafka >> > topic with 1 partition) without any data >> > from the side input: */OOM issue/* >> > */ >> > /* >> > So I just wonder what is the behaviour >> > if there is no data from the side input? >> > does it cache the data from main stream? >> > If so, is there a way to allow >> > processing mainstream data without >> > waiting for data from side input? >> > >> > Thanks a lot! >> > Eleanore >> > >> > On Sat, May 2, 2020 at 1:04 PM Eleanore >> > Jin <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Hi Community, >> > >> > I am running beam(2.16) with flink >> > (1.8.2), in my pipeline there is a >> > sideinput which reads from a compact >> > kafka topic from earliest, and the >> > sideinput value is used for >> > filtering. I keeps on getting the >> > OOM: GC overhead limit exceeded. >> > >> > The side input method looks like >> below. >> > >> > private <T> PCollection<KV<String, >> T>> createSideCollection(String topicName, >> > Class<? extends Deserializer<T>> >> deserializerClass) { >> > >> > Map<String, Object> >> consumerProperties = ImmutableMap.<String, Object>builder() >> > >> .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") >> > >> .put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()) >> > .build(); >> > >> > PCollection<KV<String, T>> >> kafkaValues = pipeline.apply("collectionFromTopic-" + topicName, >> > KafkaIO.<String, T>read() >> > >> .withBootstrapServers(kafkaSettings.getBootstrapServers()) >> > >> .withTopics(Collections.singletonList(topicName)) >> > >> .withKeyDeserializer(KeyDeserializer.class) >> > >> .withValueDeserializer(deserializerClass) >> > >> .withConsumerConfigUpdates(consumerProperties) >> > .withoutMetadata()); >> > >> > Trigger trigger = >> Repeatedly.forever( >> > AfterFirst.of( >> > >> AfterPane.elementCountAtLeast(1), >> > >> AfterProcessingTime.pastFirstElementInPane() >> > )); >> > >> > return >> kafkaValues.apply(Window.<KV<String, T>>into(new GlobalWindows()) >> > .triggering(trigger) >> > .accumulatingFiredPanes() >> > >> .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS) >> > ); >> > } >> > >> > we run 2 flink task managers each >> with 4 slots. I think the problem lies in >> > >> > the sideinput after looking into >> the heap dump. Also double confirmed disable >> > >> > the sideinput, the pipeline runs >> fine. >> > >> > Can you please provide some help? >> > >> > Thanks a lot! >> > >> > Eleanore >> > >> > image.png >> > >> >
