Hi Max, No I did not introduce RocksDB at this point since the pipeline is stateless apart from Kafka offset.
So what we do is to ensure there is a dummy message in the side input to avoid this situation. Thanks! Eleanore On Mon, May 11, 2020 at 2:57 AM Maximilian Michels <[email protected]> wrote: > Generally, it is to be expected that the main input is buffered until > the side input is available. We really have no other option to correctly > process the data. > > Have you tried using RocksDB as the state backend to prevent too much GC > churn? > > -Max > > On 07.05.20 06:27, Eleanore Jin wrote: > > Please see: https://issues.apache.org/jira/browse/BEAM-9914 > > > > Thanks a lot! > > Eleanore > > > > On Wed, May 6, 2020 at 9:17 PM Ankur Goenka <[email protected] > > <mailto:[email protected]>> wrote: > > > > Thanks for sharing the response. It makes sense to me. > > Please file a jira in Beam so that we can prioritize it. > > > > Thanks, > > Ankur > > > > On Wed, May 6, 2020 at 9:08 PM Eleanore Jin <[email protected] > > <mailto:[email protected]>> wrote: > > > > Hi Ankur, > > > > Thanks for your response. > > > > I also checked with Flink Community, here is there response, in > > short, flink does not cache the main input data if there is no > > data available in side input (flink broadcast stream) > > > > - quote from flink community: > > > > Coming back to your question, Flink's Broadcast stream does > > *not* block or collect events from the non-broadcasted side if > > the broadcast side doesn't serve events. > > However, the user-implemented operators (Beam or your code in > > this case) often puts non-broadcasted events into state to wait > > for input from the other side. > > Since the error is not about lack of memory, the buffering in > > Flink state might not be the problem here. > > > > Thanks a lot for the help! > > Eleanore > > > > On Wed, May 6, 2020 at 8:59 PM Ankur Goenka <[email protected] > > <mailto:[email protected]>> wrote: > > > > The relevant code should bere > > here > https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L595 > > > > > Given that the problem goes away after publishing Side input > > suggests that this might be a problem with synchronizing 2 > > streams of data on Flink using Beam. > > > > I am not sure if flink optimizer waits for site input to be > > available before processing the main input. We might > > potentially handle this on the Beam side as well or use a > > different set of flink apis to let us do better optimization > > if possible. In any case this would require a new sdk > > release if we decide to fix. > > > > On Wed, May 6, 2020 at 7:54 PM Eleanore Jin > > <[email protected] <mailto:[email protected]>> > wrote: > > > > Hi Ankur, > > > > Thanks for the answer! Can you please point to me the > > source code where the buffering is? I would like to > > learn how beam works, thanks! > > > > To your question, in my case, side input does not have > > any data, meaning no one publishing to the side input > > topic. > > > > After publishing some data into the side input topic, > > the OOM goes away. > > > > Thanks! > > Eleanore > > > > On Wed, May 6, 2020 at 6:37 PM Ankur Goenka > > <[email protected] <mailto:[email protected]>> wrote: > > > > Hi Eleanore, > > > > The operation requires buffering the data till the > > data from side input is not available. Which might > > be causing the OOM issue. > > You mention that OOM happens when there is no data > > in side input. Does it mean that the side input is > > not yet ready or does side input have no data at all? > > > > Thanks, > > Ankur > > > > On Tue, May 5, 2020 at 5:15 PM Pablo Estrada > > <[email protected] <mailto:[email protected]>> > wrote: > > > > +Ankur Goenka <mailto:[email protected]> by any > > chance do you know what could be causing this? > > > > Thanks Eleanore for the detailed debugging : ) > > > > On Tue, May 5, 2020 at 9:34 AM Eleanore Jin > > <[email protected] > > <mailto:[email protected]>> wrote: > > > > Hi Community, > > > > Just wonder does side input feature buffer > > the messages from main source if there is no > > data available from side input? > > > > Thanks! > > Eleanore > > > > On Sat, May 2, 2020 at 6:28 PM Eleanore Jin > > <[email protected] > > <mailto:[email protected]>> wrote: > > > > After some more experience, I observed > > following: > > 1. run pipeline without sideinput: no > > OOM issue > > 2. run pipeline with sideinput (kafka > > topic with 1 partition) with data > > available from this side input: no OOM > issue > > 3. run pipeline with sideinput (kafka > > topic with 1 partition) without any data > > from the side input: */OOM issue/* > > */ > > /* > > So I just wonder what is the behaviour > > if there is no data from the side input? > > does it cache the data from main stream? > > If so, is there a way to allow > > processing mainstream data without > > waiting for data from side input? > > > > Thanks a lot! > > Eleanore > > > > On Sat, May 2, 2020 at 1:04 PM Eleanore > > Jin <[email protected] > > <mailto:[email protected]>> wrote: > > > > Hi Community, > > > > I am running beam(2.16) with flink > > (1.8.2), in my pipeline there is a > > sideinput which reads from a compact > > kafka topic from earliest, and the > > sideinput value is used for > > filtering. I keeps on getting the > > OOM: GC overhead limit exceeded. > > > > The side input method looks like > below. > > > > private <T> PCollection<KV<String, > T>> createSideCollection(String topicName, > > Class<? extends Deserializer<T>> > deserializerClass) { > > > > Map<String, Object> > consumerProperties = ImmutableMap.<String, Object>builder() > > > .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") > > > .put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()) > > .build(); > > > > PCollection<KV<String, T>> > kafkaValues = pipeline.apply("collectionFromTopic-" + topicName, > > KafkaIO.<String, T>read() > > > .withBootstrapServers(kafkaSettings.getBootstrapServers()) > > > .withTopics(Collections.singletonList(topicName)) > > > .withKeyDeserializer(KeyDeserializer.class) > > > .withValueDeserializer(deserializerClass) > > > .withConsumerConfigUpdates(consumerProperties) > > .withoutMetadata()); > > > > Trigger trigger = > Repeatedly.forever( > > AfterFirst.of( > > > AfterPane.elementCountAtLeast(1), > > > AfterProcessingTime.pastFirstElementInPane() > > )); > > > > return > kafkaValues.apply(Window.<KV<String, T>>into(new GlobalWindows()) > > .triggering(trigger) > > .accumulatingFiredPanes() > > > .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS) > > ); > > } > > > > we run 2 flink task managers each > with 4 slots. I think the problem lies in > > > > the sideinput after looking into the > heap dump. Also double confirmed disable > > > > the sideinput, the pipeline runs > fine. > > > > Can you please provide some help? > > > > Thanks a lot! > > > > Eleanore > > > > image.png > > >
