Hi Reuven,
This is the code snippet for side input, we dont have window & trigger in
the main business logic PTransforms.
Thanks!
Eleanore
private <T> PCollection<KV<String, T>> createSideCollection(String topicName,
Class<? extends Deserializer<T>> deserializerClass) {
Map<String, Object> consumerProperties = ImmutableMap.<String,
Object>builder()
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString())
.build();
PCollection<KV<String, T>> kafkaValues =
pipeline.apply("collectionFromTopic-" + topicName,
KafkaIO.<String, T>read()
.withBootstrapServers(kafkaSettings.getBootstrapServers())
.withTopics(Collections.singletonList(topicName))
.withKeyDeserializer(KeyDeserializer.class)
.withValueDeserializer(deserializerClass)
.withConsumerConfigUpdates(consumerProperties)
.withoutMetadata());
Trigger trigger = Repeatedly.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(1),
AfterProcessingTime.pastFirstElementInPane()
));
return kafkaValues.apply(Window.<KV<String, T>>into(new GlobalWindows())
.triggering(trigger)
.accumulatingFiredPanes()
.withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
);
}
On Wed, May 13, 2020 at 1:39 PM Reuven Lax <[email protected]> wrote:
> Do you have a window or trigger set up in your pipeline?
>
> On Mon, May 11, 2020 at 2:41 PM Eleanore Jin <[email protected]>
> wrote:
>
>>
>> Hi Max,
>>
>> No I did not introduce RocksDB at this point since the pipeline is
>> stateless apart from Kafka offset.
>>
>> So what we do is to ensure there is a dummy message in the side input to
>> avoid this situation.
>>
>> Thanks!
>> Eleanore
>>
>> On Mon, May 11, 2020 at 2:57 AM Maximilian Michels <[email protected]>
>> wrote:
>>
>>> Generally, it is to be expected that the main input is buffered until
>>> the side input is available. We really have no other option to correctly
>>> process the data.
>>>
>>> Have you tried using RocksDB as the state backend to prevent too much GC
>>> churn?
>>>
>>> -Max
>>>
>>> On 07.05.20 06:27, Eleanore Jin wrote:
>>> > Please see: https://issues.apache.org/jira/browse/BEAM-9914
>>> >
>>> > Thanks a lot!
>>> > Eleanore
>>> >
>>> > On Wed, May 6, 2020 at 9:17 PM Ankur Goenka <[email protected]
>>> > <mailto:[email protected]>> wrote:
>>> >
>>> > Thanks for sharing the response. It makes sense to me.
>>> > Please file a jira in Beam so that we can prioritize it.
>>> >
>>> > Thanks,
>>> > Ankur
>>> >
>>> > On Wed, May 6, 2020 at 9:08 PM Eleanore Jin <
>>> [email protected]
>>> > <mailto:[email protected]>> wrote:
>>> >
>>> > Hi Ankur,
>>> >
>>> > Thanks for your response.
>>> >
>>> > I also checked with Flink Community, here is there response, in
>>> > short, flink does not cache the main input data if there is no
>>> > data available in side input (flink broadcast stream)
>>> >
>>> > - quote from flink community:
>>> >
>>> > Coming back to your question, Flink's Broadcast stream does
>>> > *not* block or collect events from the non-broadcasted side if
>>> > the broadcast side doesn't serve events.
>>> > However, the user-implemented operators (Beam or your code in
>>> > this case) often puts non-broadcasted events into state to wait
>>> > for input from the other side.
>>> > Since the error is not about lack of memory, the buffering in
>>> > Flink state might not be the problem here.
>>> >
>>> > Thanks a lot for the help!
>>> > Eleanore
>>> >
>>> > On Wed, May 6, 2020 at 8:59 PM Ankur Goenka <[email protected]
>>> > <mailto:[email protected]>> wrote:
>>> >
>>> > The relevant code should bere
>>> > here
>>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L595
>>>
>>> >
>>> > Given that the problem goes away after publishing Side
>>> input
>>> > suggests that this might be a problem with synchronizing 2
>>> > streams of data on Flink using Beam.
>>> >
>>> > I am not sure if flink optimizer waits for site input to be
>>> > available before processing the main input. We might
>>> > potentially handle this on the Beam side as well or use a
>>> > different set of flink apis to let us do better
>>> optimization
>>> > if possible. In any case this would require a new sdk
>>> > release if we decide to fix.
>>> >
>>> > On Wed, May 6, 2020 at 7:54 PM Eleanore Jin
>>> > <[email protected] <mailto:[email protected]>>
>>> wrote:
>>> >
>>> > Hi Ankur,
>>> >
>>> > Thanks for the answer! Can you please point to me the
>>> > source code where the buffering is? I would like to
>>> > learn how beam works, thanks!
>>> >
>>> > To your question, in my case, side input does not have
>>> > any data, meaning no one publishing to the side input
>>> > topic.
>>> >
>>> > After publishing some data into the side input topic,
>>> > the OOM goes away.
>>> >
>>> > Thanks!
>>> > Eleanore
>>> >
>>> > On Wed, May 6, 2020 at 6:37 PM Ankur Goenka
>>> > <[email protected] <mailto:[email protected]>> wrote:
>>> >
>>> > Hi Eleanore,
>>> >
>>> > The operation requires buffering the data till the
>>> > data from side input is not available. Which might
>>> > be causing the OOM issue.
>>> > You mention that OOM happens when there is no data
>>> > in side input. Does it mean that the side input is
>>> > not yet ready or does side input have no data at
>>> all?
>>> >
>>> > Thanks,
>>> > Ankur
>>> >
>>> > On Tue, May 5, 2020 at 5:15 PM Pablo Estrada
>>> > <[email protected] <mailto:[email protected]>>
>>> wrote:
>>> >
>>> > +Ankur Goenka <mailto:[email protected]> by
>>> any
>>> > chance do you know what could be causing this?
>>> >
>>> > Thanks Eleanore for the detailed debugging : )
>>> >
>>> > On Tue, May 5, 2020 at 9:34 AM Eleanore Jin
>>> > <[email protected]
>>> > <mailto:[email protected]>> wrote:
>>> >
>>> > Hi Community,
>>> >
>>> > Just wonder does side input feature buffer
>>> > the messages from main source if there is
>>> no
>>> > data available from side input?
>>> >
>>> > Thanks!
>>> > Eleanore
>>> >
>>> > On Sat, May 2, 2020 at 6:28 PM Eleanore Jin
>>> > <[email protected]
>>> > <mailto:[email protected]>> wrote:
>>> >
>>> > After some more experience, I observed
>>> > following:
>>> > 1. run pipeline without sideinput: no
>>> > OOM issue
>>> > 2. run pipeline with sideinput (kafka
>>> > topic with 1 partition) with data
>>> > available from this side input: no OOM
>>> issue
>>> > 3. run pipeline with sideinput (kafka
>>> > topic with 1 partition) without any
>>> data
>>> > from the side input: */OOM issue/*
>>> > */
>>> > /*
>>> > So I just wonder what is the behaviour
>>> > if there is no data from the side
>>> input?
>>> > does it cache the data from main
>>> stream?
>>> > If so, is there a way to allow
>>> > processing mainstream data without
>>> > waiting for data from side input?
>>> >
>>> > Thanks a lot!
>>> > Eleanore
>>> >
>>> > On Sat, May 2, 2020 at 1:04 PM Eleanore
>>> > Jin <[email protected]
>>> > <mailto:[email protected]>>
>>> wrote:
>>> >
>>> > Hi Community,
>>> >
>>> > I am running beam(2.16) with flink
>>> > (1.8.2), in my pipeline there is a
>>> > sideinput which reads from a
>>> compact
>>> > kafka topic from earliest, and the
>>> > sideinput value is used for
>>> > filtering. I keeps on getting the
>>> > OOM: GC overhead limit exceeded.
>>> >
>>> > The side input method looks like
>>> below.
>>> >
>>> > private <T> PCollection<KV<String,
>>> T>> createSideCollection(String topicName,
>>> > Class<? extends Deserializer<T>>
>>> deserializerClass) {
>>> >
>>> > Map<String, Object>
>>> consumerProperties = ImmutableMap.<String, Object>builder()
>>> >
>>> .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>>> >
>>> .put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString())
>>> > .build();
>>> >
>>> > PCollection<KV<String, T>>
>>> kafkaValues = pipeline.apply("collectionFromTopic-" + topicName,
>>> > KafkaIO.<String, T>read()
>>> >
>>> .withBootstrapServers(kafkaSettings.getBootstrapServers())
>>> >
>>> .withTopics(Collections.singletonList(topicName))
>>> >
>>> .withKeyDeserializer(KeyDeserializer.class)
>>> >
>>> .withValueDeserializer(deserializerClass)
>>> >
>>> .withConsumerConfigUpdates(consumerProperties)
>>> > .withoutMetadata());
>>> >
>>> > Trigger trigger =
>>> Repeatedly.forever(
>>> > AfterFirst.of(
>>> >
>>> AfterPane.elementCountAtLeast(1),
>>> >
>>> AfterProcessingTime.pastFirstElementInPane()
>>> > ));
>>> >
>>> > return
>>> kafkaValues.apply(Window.<KV<String, T>>into(new GlobalWindows())
>>> > .triggering(trigger)
>>> > .accumulatingFiredPanes()
>>> >
>>> .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
>>> > );
>>> > }
>>> >
>>> > we run 2 flink task managers each
>>> with 4 slots. I think the problem lies in
>>> >
>>> > the sideinput after looking into
>>> the heap dump. Also double confirmed disable
>>> >
>>> > the sideinput, the pipeline runs
>>> fine.
>>> >
>>> > Can you please provide some help?
>>> >
>>> > Thanks a lot!
>>> >
>>> > Eleanore
>>> >
>>> > image.png
>>> >
>>>
>>