Hi,

nice work on debugging this!

We need the synchronized block in the source because the call to reader.advance() (via the invoker) and reader.getCurrent() (via emitElement()) must be atomic with respect to state. We cannot advance the reader state, not emit that record but still checkpoint the new reader state. The monitor ensures that no checkpoint can happen in between those to calls.

The basic problem is now that we starve checkpointing because the monitor/lock is not fair. This could be solved by using a fair lock but that would require Flink proper to be changed to use a fair lock instead of a monitor/synchronized. I don't see this as an immediate solution.

One thing that exacerbates this problem is that too many things are happening "under" the synchronized block. All the transforms before a shuffle/rebalance/keyBy are chained to the source, which means that they are invoked from the emitElement() call. You could see this by printing/logging a stacktrace in your user function that does the Redis lookups.

A possible mitigation would be to disable chaining globally by inserting a `flinkStreamEnv.disableOperatorChaining()` in [1].

A more surgical version would be to only disable chaining for sources. I'm attaching a patch for that in case you're willing to try it out. This is for latest master but it's easy enough to apply manually.

Best,
Aljoscha

[1] https://github.com/apache/beam/blob/d4923711cdd7b83175e21a6a422638ce530bc80c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L225

On 23.10.20 09:47, Piotr Nowojski wrote:
Hi Josson,

Thanks for great investigation and coming back to use. Aljoscha, could you
help us here? It looks like you were involved in this original BEAM-3087
issue.

Best,
Piotrek

pt., 23 paź 2020 o 07:36 Josson Paul <jossonp...@gmail.com> napisał(a):

@Piotr Nowojski <pnowoj...@apache.org>  @Nico Kruber <nkru...@apache.org>

An update.

I am able to figure out the problem code. A change in the Apache Beam code
is causing this problem.





Beam introduced a lock on the “emit” in Unbounded Source. The lock is on
the Flink’s check point lock. Now the same lock is used by Flink’s timer
service to emit the Watermarks. Flink’s timer service is starved to get
hold of the lock and for some reason it never gets that lock. Aftereffect
  of this situation is that the ‘WaterMark’ is never emitted by Flink’s
timer service.  Because there is no Watermarks flowing through the system,
Sliding Windows are never closed. Data gets accumulated in the Window.



This problem occurs only if we have external lookup calls (like Redis)
happen before the data goes to Sliding Window. Something like below.



KafkaSource à Transforms (Occasional Redis
lookup)->SlidingWindow->Transforms->Kafka Sink






https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256
. This is Beam 2.4 and you can see that there is no synchronized block at
line 257 and 270.




https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264
. This is Beam 2.15. See the synchronized block introduced in line 264 and
280. We are using Beam 2.15 and Flink 1.8.



Beam introduced this synchronized block because of this bug.
https://issues.apache.org/jira/browse/BEAM-3087



After I removed that synchronized keyword everything started working fine
in my application.



What do you guys think about this?. Why does Beam need a Synchronized
block there?



Beam is using this lock ->


https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282



Thanks,

Josson

On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski <pnowoj...@apache.org>
wrote:

Hi Josson,

The TM logs that you attached are only from a 5 minutes time period. Are
you sure they are encompassing the period before the potential failure and
after the potential failure? It would be also nice if you would provide the
logs matching to the charts (like the one you were providing in the
previous messages), to correlate events (spike in latency/GC with some
timestamp from the logs).

I was not asking necessarily to upgrade to Java9, but an updated/bug
fixed version of Java8 [1].

1) In Flink 1.4 set up, the data in the Heap is throttled. It never
goes out of memory whatever be the ingestion rate. our Windows are 5
minutes windows.
2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
Full GC doesn't reclaim space.

In both cases there is the same mechanism for the backpressure. If a
task's output runs out of buffers to put produced records, it will block
the task. It can be that between 1.4 and 1.8, with credit based flow
control changes, the amount of available buffers for the tasks on your
setup has grown, so the tasks are backpressuring later. This in turn can
sometimes mean that at any point of time there is more data buffered on the
operator's state, like `WindowOperator`. I'm not sure what's the
best/easiest way how to check this:

1. the amount of buffered data might be visible via metrics [2][3]
2. if you enable DEBUG logs, it should be visible via:

LOG.debug("Using a local buffer pool with {}-{} buffers",
numberOfRequiredMemorySegments, maxNumberOfMemorySegments);

entry logged by
`org.apache.flink.runtime.io.network.buffer.LocalBufferPool`.

Piotrek

[1] https://en.wikipedia.org/wiki/Java_version_history#Java_8_updates
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#network
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#network

pon., 14 wrz 2020 o 05:04 Josson Paul <jossonp...@gmail.com> napisał(a):

@Piotr Nowojski <pnowoj...@apache.org> @Nico Kruber <nkru...@apache.org>
I have attached the  Taskmanager/GC/thread dumps in a zip file.

I don't see any issues in the TM logs.
Tried to upgrade to Java 9. Flink is on top of another platform which
threw errors while upgrading to Java 9. I can't do much for now. We will
upgrade to Jdk 11 in another 2 months.

Regarding the Heap size. The new experiment I did was on 4gb Heap on
both Flink 1.4 and Flink 1.8.

Questions I am trying to get answered are

1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes
out of memory whatever be the ingestion rate. our Windows are 5
minutes windows.
2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
Full GC doesn't reclaim space.


On Fri, Sep 11, 2020 at 12:58 AM Piotr Nowojski <pnowoj...@apache.org>
wrote:

Hi Josson,

Have you checked the logs as Nico suggested? At 18:55 there is a dip in
non-heap memory, just about when the problems started happening. Maybe you
could post the TM logs?
Have you tried updating JVM to a newer version?
Also it looks like the heap size is the same between 1.4 and 1.8, but
in an earlier message you said you increased it by 700MB?

Piotrek

pt., 11 wrz 2020 o 05:07 Josson Paul <jossonp...@gmail.com> napisał(a):

I have attached two word documents.
Flink1.4 and Flink1.8
I reduced the heap size in the cluster and tried the experiment in
both Flink 1.4 and Flink 1.8.
My goal was to simulate ingestion rate of 200 Clients/sec (Not going
into the details here).

In Flink 1.4 I could reach 200 Clients/Sec and I ran the cluster for 1
hour. You can see the details in the attached Flink1.4 document file. You
can see the GC activity and Cpu. Both are holding good.

In Flin 1.8 I could reach only 160 Clients/Sec and the issue started
happening. Issue started within 15 minutes of starting the ingestion. @Piotr
Nowojski <pnowoj...@apache.org> , you can see that there is no meta
space related issue. All the GC related details are available in the doc.

Especially see the difference in Heap dump of 'Biggest Objects' in
both clusters. How Flink 1.4 holds lesser objects in Heap. Is it because
Flink 1.4 was efficient and 1.8 solved that in efficiency and this problem
is expected?.

@Nicko, We are not doing the fat jar stuff.

@Piotr Nowojski <pnowoj...@apache.org> , we are in the process of
upgrading to Java 11 and Flink 1.11. But I need at least 2 months.


I am not getting the Finalizer problem in the latest heap dump. Maybe
it was happening only 1 or 2 times.

Please let me know if you need additional input


Thanks,
Josson


On Thu, Sep 10, 2020 at 5:19 AM Nico Kruber <nkru...@apache.org>
wrote:

What looks a bit strange to me is that with a running job, the
SystemProcessingTimeService should actually not be collected (since
it is
still in use)!

My guess is that something is indeed happening during that time frame
(maybe
job restarts?) and I would propose to check your logs for anything
suspicious
in there.


When I did experiments with Beam pipelines on our platform [1], I
also
noticed, that the standard fat jars that Beam creates include Flink
runtime
classes it shouldn't (at least if you are submitting to a separate
Flink
cluster). This can cause all sorts of problems and I would recommend
removing
those from the fat jar as documented in [1].




Nico



[1] https://ververica.zendesk.com/hc/en-us/articles/360014323099

On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote:
Hi Josson,

Thanks again for the detailed answer, and sorry that I can not help
you
with some immediate answer. I presume that jvm args for 1.8 are the
same?

Can you maybe post what exactly has crashed in your cases a) and b)?
Re c), in the previously attached word document, it looks like
Flink was
running without problems for a couple of hours/minutes, everything
was
stable, no signs of growing memory consumption, impending problem,
until
around 23:15, when the problem started, right? Has something else
happened
at that time, something that could explain the spike? A checkpoint?
Job
crash/restart? Load spike?

A couple of other random guesses:
- have you been monitoring other memory pools for Flink 1.4 and
1.8? Like
meta space? Growing meta space size can sometimes cause problems. It
shouldn't be the case here, as you configured XX:MaxMetaspaceSize,
but it
might be still worth checking...
- another random idea, have you tried upgrading JDK? Maybe that
would solve
the problem?

Best regards,
Piotrek

śr., 9 wrz 2020 o 19:53 Josson Paul <jossonp...@gmail.com>
napisał(a):
Hi Piotr,

  *JVM start up for Flink 1.4*

*-------------------------------*


java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta
skmgr-assurance-1-77d44cf64-z8gd4.heapdump-
*Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m*
-XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4

*-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4*
-Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9
%p*-Dio.netty.eventLoopThreads=3*

-Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo

g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw
orkaddress.cache.ttl=120-Dnum.cores=3-

*XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par
allelism=3-XX:ConcGCThreads=4 *

-Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di

r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di

r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/

resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi

pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m
aglev.MaglevServerstartmaglev>
    1.   taskmanager.memory.fraction = 0.7f (This was coming to
4.5 GB. I
    didn't know at that time that we could set memory fraction to
zero
    because
    ours is a streaming job. It was  picking up the default )
    2.    Network buffer pool memory was 646MB on the Heap (I
think this
    was the default based on some calculations in the Flink 1.4)
    3.    G1GC region size was 4MB (Default)

I tested this setup by reducing the JVM heap by *1GB.* It still
worked
perfectly with some lags here and there.

*JVM start up for Flink 1.8*
*------------------------------------*
a) I started with the same configuration as above. Kubenetis POD
went out
of memory. At this point I realized that in Flink 1.8  network
buffer
pools
are moved to native memory. Based on calculations it was coming
to 200MB
in
native  memory. I increased the overall POD memory to accommodate
the
buffer pool change keeping the *heap the same*.

b) Even after I modified the overall POD memory,  the POD still
crashed.
At this point I generated Flame graphs to identify the CPU/Malloc
calls
(Attached as part of the initial email). Realized that cpu usage
of G1GC
is
significantly different from Flink 1.4. Now I made 2 changes

    1.  taskmanager.memory.fraction = 0.01f (This will give more
heap for
    user code)
    2. Increased cpu from 3 to 4 cores.

         Above changes helped to hold the cluster a little longer.
But it

still crashed after sometime.

c)  Now I made the below changes.

    1. I came across this ->

http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002
    622.html . Now I changed the G1GC region space to *8MB
*instead of the
    default 4MB*.*
    2. -XX:MaxGCPauseMillis=2000 (I even tried higher in later
experiments)
    3. Played around with G1RSetSparseRegionEntries

        This helped to avoid the POD going out of memory. But the
Old Gen

heap issue was very evident now (Please see the attached word
document).

  d)  Allocated additional heap memory of *700 MB *along with the
above

changes. This also didn't help. It just prolonged the crash.  Now
I need
help from others to which direction I want to take this to .

My worry is even if I upgrade to flink 1.11 this issue might still
persist.

I have attached a screenshot from Heap dump to show you the
difference
between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is
created. Not
sure whether this change has something to do with this memory
issue that I
am facing.
Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8


Thanks,
Josson

On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski <
pnowoj...@apache.org>

wrote:
Hi Josson,

Thanks for getting back.

What are the JVM settings and in particular GC settings that you
are
using (G1GC?)?
It could also be an issue that in 1.4 you were just slightly
below the
threshold of GC issues, while in 1.8, something is using a bit
more
memory,
causing the GC issues to appear? Have you tried just increasing
the heap
size?
Have you tried to compare on the job start up, what is the usage
and size
of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can
point us in
the right direction.

My understanding on back pressure is that it is not based on
Heap

memory but based on how fast the Network buffers are filled. Is
this
correct?.

Does Flink use TCP connection to communicate between tasks if
the tasks

are in the same Task manager?.

No, local input channels are being used then, but memory for
network
buffers is assigned to tasks regardless of the fraction of local
input
channels in the task. However with just single taskmanager and
parallelism
of 4, the amount of the memory used by the network stack should
be
insignificant, at least as long as you have a reasonably sized
job graph
(32KB * (2 * parallelism + 7) * number of tasks).

What I noticed in Flink 1.4 is that it doesn't read data from
Kafka if

there is not sufficient heap memory to process data. Somehow
this is not
happening in Flink 1.8 and it fills the heap soon enough not to
get
GCed/Finalized. Any change around this between Flink 1.4 and
Flink 1.8.

No, there were no changes in this part as far as I remember.
Tasks when
producing records are serialising them and putting into the
network
buffers. If there are no available network buffers, the task is
back
pressuring and stops processing new records.

Best regards,
Piotrek

wt., 8 wrz 2020 o 21:51 Josson Paul <jossonp...@gmail.com>
napisał(a):
Hi Piotr,

    2) SystemProcessingTimeService holds the
HeapKeyedStateBackend and

HeapKeyedStateBackend has lot of Objects and that is filling
the Heap

    3) I am not using Flink Kafka Connector. But we are using
Apache Beam

kafka connector.  There is a change in the Apache Beam version.
But the
kafka client we are using is the same as the one which was
working in
the
other cluster where  Flink was 1.4.

   *There is no change in Hardware/Java/Kafka/Kafka
Client/Application

between the cluster which is working and not working*

I am aware of the memory changes and network buffer changes
between 1.4
and 1.8.

Flink 1.4 had network buffers on Heap and 1.8 network buffers
are on the
native memory. I modified the Flink 1.8 code to put it back to
Heap
memory
but the issue didn't get resolved.

Mine is a streaming job so we set 'taskmanager.memory.fraction'
to very
minimal and that heap is fully available for user data.

Flink 1.4 was not using Credit based Flow control and Flink 1.8
uses
Credit based Flow control. *Our set up has only 1 task manager
and 4
parallelisms*.  According to this video

https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward (
*16:21*) if tasks are in same task manager,  Flink doesn't use
Credit
Based Flow control. Essentially no change between Flink 1.4 and
1.8 in
*our
set up*. Still I tried to change the Credit Based Flow Control
to False
and test my setup. The problem persists.

What I noticed in Flink 1.4 is that it doesn't read data from
Kafka if
there is not sufficient heap memory to process data. Somehow
this is not
happening in Flink 1.8 and it fills the heap soon enough not to
get
GCed/Finalized. Any change around this between Flink 1.4 and
Flink 1.8.

My understanding on back pressure is that it is not based on
Heap memory
but based on how fast the Network buffers are filled. Is this
correct?.
Does Flink use TCP connection to communicate between tasks if
the tasks
are in the same Task manager?.

Thanks,
josson

On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski <
pnowoj...@apache.org>

wrote:
Hi Josson,

2. Are you sure that all/vast majority of those objects are
pointing
towards SystemProcessingTimeService? And is this really the
problem of
those objects? Are they taking that much of the memory?
3. It still could be Kafka's problem, as it's likely that
between 1.4
and 1.8.x we bumped Kafka dependencies.

Frankly if that's not some other external dependency issue, I
would
expect that the problem might lie somewhere completely else.
Flink's
code
relaying on the finalisation hasn't changed since 2015/2016.
On the
other
hand there were quite a bit of changes between 1.4 and 1.8.x,
some of
them
were affecting memory usage. Have you read release notes for
versions
1.5,
1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have
memory
related notes that could be addressed via configuration
changes.

Thanks,
Piotrek

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-not
es/flink-1.5.html [2]

https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-not
es/flink-1.8.html>>>>
czw., 3 wrz 2020 o 18:50 Josson Paul <jossonp...@gmail.com>
napisał(a):
1) We are in the process of migrating to Flink 1.11. But it
is going
to take a while before we can make everything work with the
latest
version.
Meanwhile since this is happening in production I am trying
to solve
this.
2) Finalizae class is pointing
to

org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService
.
This class has a finalize method. I have attached spreadsheet
(
*Object-explorer.csv*) to give you a high level view
3) The difference between working cluster and NON working
cluster is
only on Beam and Flink. Hardware, Input message rate,
Application
jars,
Kafka are all the same between those 2 clusters. Working
cluster was
with
Flink 1.4 and Beam 2.4.0

Any insights into this will help me to debug further

Thanks,
Josson


On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski <
pnowoj...@apache.org>

wrote:
Hi,

Have you tried using a more recent Flink version? 1.8.x is
no longer
supported, and latest versions might not have this issue
anymore.

Secondly, have you tried backtracking those references to the
Finalizers? Assuming that Finalizer is indeed the class
causing
problems.

Also it may well be a non Flink issue [1].

Best regards,
Piotrek

[1] https://issues.apache.org/jira/browse/KAFKA-8546

czw., 3 wrz 2020 o 04:47 Josson Paul <jossonp...@gmail.com>

napisał(a):
Hi All,

*ISSUE*
------
Flink application runs for sometime and suddenly the CPU
shoots up
and touches the peak, POD memory reaches to the peak, GC
count
increases,
Old-gen spaces reach close to 100%. Full GC doesn't clean
up heap
space. At
this point I stopped sending the data and cancelled the
Flink Jobs.
Still
the Old-Gen space doesn't come down. I took a heap dump and
can see
that
lot of Objects in the java.lang.Finalizer class. I have
attached the
details in a word document. I do have the heap dump but it
is close
to 2GB
of compressed size. Is it safe to upload somewhere and
share it
here?.

This issue doesn't happen in Flink: 1.4.0 and Beam:
release-2.4.0

*WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam:
release-2.4.0)
----------------------------------------------------

Application reads from Kafka and does aggregations and
writes into
Kafka. Application has 5 minutes windows. Application uses
Beam
constructs
to build the pipeline. To read and write we use Beam
connectors.

Flink version: 1.4.0
Beam version: release-2.4.0
Backend State: State backend is in the Heap and check
pointing
happening to the distributed File System.

No of task Managers: 1
Heap: 6.4 GB
CPU: 4 Cores
Standalone cluster deployment on a Kubernetes pod

*NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam
version:
release-2.15.0)
----------
Application details are same as above

*No change in application and the rate at which data is
injected.
But change in Flink and Beam versions*


Flink version: 1.8.3
Beam version: release-2.15.0
Backend State: State backend is in the Heap and check
pointing
happening to the distributed File System.

No of task Managers: 1
Heap: 6.5 GB
CPU: 4 Cores

Deployment: Standalone cluster deployment on a Kubernetes
pod

My Observations
-------------

1) CPU flame graph shows that in the working version, the
cpu time
on GC is lesser compared to non-working version (Please see
the
attached
Flame Graph. *CPU-flame-WORKING.svg* for working cluster and
*CPU-flame-NOT-working.svg*)

2) I have attached the flame graph for native memory MALLOC
calls
when the issue was happening. Please find the attached SVG
image (
*malloc-NOT-working.svg*). The POD memory peaks when this
issue
happens. For me, it looks like the GC process is requesting
a lot of
native
memory.

3) When the issue is happening the GC cpu usage is very
high. Please
see the flame graph (*CPU-graph-at-issuetime.svg*)

Note: SVG file can be opened using any browser and it is
clickable
while opened.
--
Thanks
Josson

--
Thanks
Josson

--
Thanks
Josson

--
Thanks
Josson






--
Thanks
Josson



--
Thanks
Josson



--
Thanks
Josson



From c93636ad96b98eb1e56d7aa8dc9fe1e09272cbe3 Mon Sep 17 00:00:00 2001
From: Aljoscha Krettek <aljos...@apache.org>
Date: Wed, 11 Nov 2020 19:05:10 +0100
Subject: [PATCH] [BEAM-XXXXX] Don't chain sources to avoid checkpoint
 starvation

---
 .../flink/FlinkStreamingPortablePipelineTranslator.java     | 3 ++-
 .../runners/flink/FlinkStreamingTransformTranslators.java   | 6 ++++--
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 2112941350..77627a3fb7 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -542,7 +542,8 @@ public class FlinkStreamingPortablePipelineTranslator
         source =
             nonDedupSource
                 .flatMap(new 
FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-                .returns(outputTypeInfo);
+                .returns(outputTypeInfo)
+                .disableChaining();
       }
     } catch (Exception e) {
       throw new RuntimeException("Error while translating UnboundedSource: " + 
unboundedSource, e);
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 319e5b7a8a..213e970221 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -229,7 +229,8 @@ class FlinkStreamingTransformTranslators {
                 .addSource(sourceWrapper)
                 .name(fullName)
                 .uid(fullName)
-                .returns(withIdTypeInfo);
+                .returns(withIdTypeInfo)
+                .disableChaining();
 
         if (rawSource.requiresDeduping()) {
           source =
@@ -378,7 +379,8 @@ class FlinkStreamingTransformTranslators {
                 .addSource(sourceWrapper)
                 .name(fullName)
                 .uid(fullName)
-                .returns(outputTypeInfo);
+                .returns(outputTypeInfo)
+                .disableChaining();
       } catch (Exception e) {
         throw new RuntimeException("Error while translating BoundedSource: " + 
rawSource, e);
       }
-- 
2.29.2

Reply via email to