Re: Avoiding OutOfMemoryError for large batch-jobs

2021-04-26 Thread Thomas Fredriksen(External)
Thank you, this is very informative.

We tried reducing the JdbcIO batch size from 1 to 1000, then to 100. In
our runs, we no longer see the explicit OOM-error, but we are seeing
executor heartbeat timeouts. From what we understand, this is typically
caused by OOM-errors also. However, the stage in question is ready from a
web server that can be slow to respond. Could it be that the request to the
web server is locking the executor long enough to cause the heartbeat
timeout?

On Mon, Apr 26, 2021 at 1:48 PM Alexey Romanenko 
wrote:

>
>
> On 26 Apr 2021, at 13:34, Thomas Fredriksen(External) <
> thomas.fredrik...@cognite.com> wrote:
>
> The stack-trace for the OOM:
>
> 21/04/21 21:40:43 WARN TaskSetManager: Lost task 1.2 in stage 2.0 (TID 57,
>> 10.139.64.6, executor 3): org.apache.beam.sdk.util.UserCodeException:
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>> at
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteVoid$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>
>
> It may be caused by a large total size of batched records before WriteFn
> flushes them.
> Did you try to decrease the number by “withBatchSize(long)” (by default,
> it’s 1000) ? [1]
>
> [1]
> https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/jdbc/JdbcIO.WriteVoid.html#withBatchSize-long-
>
> at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
>> at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
>> at
>> org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
>> at
>> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
>> at
>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
>> at org.apache.spark.scheduler.Task.run(Task.scala:113)
>> at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>
> I should note this exception is not always printed. The issue is usually
> represented by an ExecutorLostFailure (I am assuming this is caused by an
> OOM-error):
>
>>
>> 21/04/21 21:44:52 ERROR ScalaDriverLocal: User Code Stack Trace:
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 5 in stage 2.0 failed 4 times, most recent
>> failure: Lost task 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3):
>> ExecutorLostFailure (executor 3 exited caused by one of the running tasks)
>> Reason: Executor heartbeat timed out after 240254 ms
>> Driver stacktrace:
>> at
>> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>> at
>> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>> at
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
>> at
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
>> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:79)
>> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:60)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
>> at org.odp.beam.sdk.OdpPipeline.runThenExit(OdpPipeline.java:93)
>> at
>> org.odp.pipelines.emodnet_bronze.EmodNetBronze.runPipeline(EmodNetBronze.java:203)
>> at
>> org.odp.pipelines.emodnet_bronze.EmodNetBron

Re: Question on late data handling in Beam streaming mode

2021-04-26 Thread Tao Li
Thanks folks. This is really informative!

From: Kenneth Knowles 
Reply-To: "user@beam.apache.org" 
Date: Friday, April 23, 2021 at 9:34 AM
To: Reuven Lax 
Cc: user , Kenneth Knowles , Kelly Smith 
, Lian Jiang 
Subject: Re: Question on late data handling in Beam streaming mode

Reuven's answer will result in a group by key (but not window) where no data is 
dropped and you get deltas for each key. Downstream consumers can recombine the 
deltas to get per-key aggregation. So instead of putting the time interval into 
the window, you put it into the key, and then you get the same grouped 
aggregation.

There are (at least) two other ways to do this:

1. You can set allowed lateness to a high value.
2. You can use a ParDo and outputWithTimestamp [1] to set the timestamps to 
arrival time. I illustrated this in some older talks [2].

Kenn

[1] 
https://github.com/apache/beam/blob/dc636be57900c8ad9b6b9e50b08dad64be8aee40/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L184
[2] 
https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134

On Fri, Apr 23, 2021 at 8:32 AM Reuven Lax 
mailto:re...@google.com>> wrote:
You can definitely group by processing time. The way to do this in Beam is as 
follows

Window.into(new GlobalWindows())
.triggering(AfterWatermark.pastEndOfWindow() 
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))
.discardingFiredPanes());

The syntax is a bit unfortunately wordy, but the idea is that you are creating 
a single event-time window that encompasses all time, and "triggering" an 
aggregation every 30 seconds based on processing time.

On Fri, Apr 23, 2021 at 8:14 AM Tao Li 
mailto:t...@zillow.com>> wrote:
Thanks @Kenneth Knowles. I understand we need to 
specify a window for groupby so that the app knowns when processing is “done” 
to output result.

Is it possible to specify a event arrival/processing time based window for 
groupby? The purpose is to avoid dropping of late events. With a event 
processing time based window, the app will periodically output the result based 
on all events that arrived in that window, and a late arriving event will fall 
into whatever window covers its arrival time and thus that late data will not 
get lost.

Does Beam support this kind of mechanism? Thanks.

From: Kenneth Knowles mailto:k...@apache.org>>
Reply-To: "user@beam.apache.org" 
mailto:user@beam.apache.org>>
Date: Thursday, April 22, 2021 at 1:49 PM
To: user mailto:user@beam.apache.org>>
Cc: Kelly Smith mailto:kell...@zillowgroup.com>>, Lian 
Jiang mailto:li...@zillowgroup.com>>
Subject: Re: Question on late data handling in Beam streaming mode

Hello!

In a streaming app, you have two choices: wait forever and never have any 
output OR use some method to decide that aggregation is "done".

In Beam, the way you decide that aggregation is "done" is the watermark. When 
the watermark predicts no more data for an aggregation, then the aggregation is 
done. For example GROUP BY  is "done" when no more data will arrive for 
that minute. At this point, your result is produced. More data may arrive, and 
it is ignored. The watermark is determined by the IO connector to be the best 
heuristic available. You can configure "allowed lateness" for an aggregation to 
allow out of order data.

Kenn

On Thu, Apr 22, 2021 at 1:26 PM Tao Li 
mailto:t...@zillow.com>> wrote:
Hi Beam community,

I am wondering if there is a risk of losing late data from a Beam stream app 
due to watermarking?

I just went through this design doc and noticed the “droppable” definition 
there: 
https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#

Re: Avoiding OutOfMemoryError for large batch-jobs

2021-04-26 Thread Alexey Romanenko


> On 26 Apr 2021, at 13:34, Thomas Fredriksen(External) 
>  wrote:
> 
> The stack-trace for the OOM:
> 
> 21/04/21 21:40:43 WARN TaskSetManager: Lost task 1.2 in stage 2.0 (TID 57, 
> 10.139.64.6, executor 3): org.apache.beam.sdk.util.UserCodeException: 
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> at 
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteVoid$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)

It may be caused by a large total size of batched records before WriteFn 
flushes them. 
Did you try to decrease the number by “withBatchSize(long)” (by default, it’s 
1000) ? [1]

[1] 
https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/jdbc/JdbcIO.WriteVoid.html#withBatchSize-long-

> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
> at 
> org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
> at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
> at 
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at 
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
> at 
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
> at org.apache.spark.scheduler.Task.run(Task.scala:113)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
> 
> I should note this exception is not always printed. The issue is usually 
> represented by an ExecutorLostFailure (I am assuming this is caused by an 
> OOM-error):
> 
> 21/04/21 21:44:52 ERROR ScalaDriverLocal: User Code Stack Trace: 
> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 5 in stage 2.0 failed 4 times, most recent failure: 
> Lost task 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3): 
> ExecutorLostFailure (executor 3 exited caused by one of the running tasks) 
> Reason: Executor heartbeat timed out after 240254 ms
> Driver stacktrace:
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:79)
> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:60)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
> at org.odp.beam.sdk.OdpPipeline.runThenExit(OdpPipeline.java:93)
> at 
> org.odp.pipelines.emodnet_bronze.EmodNetBronze.runPipeline(EmodNetBronze.java:203)
> at org.odp.pipelines.emodnet_bronze.EmodNetBronze.main(EmodNetBronze.java:209)
> at 
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command--1:1)
> at 
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw.(command--1:44)
> at 
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw.(command--1:46)
> at 
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw.(command--1:48)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw.(command--1:50)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw.(command--1:52)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read.(command--1:54)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$

Re: Avoiding OutOfMemoryError for large batch-jobs

2021-04-26 Thread Thomas Fredriksen(External)
The stack-trace for the OOM:

21/04/21 21:40:43 WARN TaskSetManager: Lost task 1.2 in stage 2.0 (TID 57,
> 10.139.64.6, executor 3): org.apache.beam.sdk.util.UserCodeException:
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteVoid$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
> at
> org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
> at
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
> at
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
> at org.apache.spark.scheduler.Task.run(Task.scala:113)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>

I should note this exception is not always printed. The issue is usually
represented by an ExecutorLostFailure (I am assuming this is caused by an
OOM-error):

>
> 21/04/21 21:44:52 ERROR ScalaDriverLocal: User Code Stack Trace:
> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 5 in stage 2.0 failed 4 times, most recent
> failure: Lost task 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3):
> ExecutorLostFailure (executor 3 exited caused by one of the running tasks)
> Reason: Executor heartbeat timed out after 240254 ms
> Driver stacktrace:
> at
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:79)
> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:60)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
> at org.odp.beam.sdk.OdpPipeline.runThenExit(OdpPipeline.java:93)
> at
> org.odp.pipelines.emodnet_bronze.EmodNetBronze.runPipeline(EmodNetBronze.java:203)
> at
> org.odp.pipelines.emodnet_bronze.EmodNetBronze.main(EmodNetBronze.java:209)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command--1:1)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw.(command--1:44)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw.(command--1:46)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw.(command--1:48)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw.(command--1:50)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw.(command--1:52)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read.(command--1:54)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.(command--1:58)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.(command--1)
> at
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print$lzycompute(:7)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print(:6)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval.$print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessor

Re: Avoiding OutOfMemoryError for large batch-jobs

2021-04-26 Thread Alexey Romanenko
Hi Thomas,

Could you share the stack trace of your OOM and, if possible, the code snippet 
of your pipeline? 
Afaik, usually only “large" GroupByKey transforms, caused by “hot keys”, may 
lead to OOM with SparkRunner.

—
Alexey


> On 26 Apr 2021, at 08:23, Thomas Fredriksen(External) 
>  wrote:
> 
> Good morning,
> 
> We are ingesting a very large dataset into our database using Beam on Spark. 
> The dataset is available through a REST-like API and is splicedin such a way 
> so that in order to obtain the whole dataset, we must do around 24000 API 
> calls.
> 
> All in all, this results in 24000 CSV files that need to be parsed then 
> written to our database.
> 
> Unfortunately, we are encountering some OutOfMemoryErrors along the way. From 
> what we have gathered, this is due to the data being queued between 
> transforms in the pipeline. In order to mitigate this, we have tried to 
> implement a streaming-scheme where the requests streamed to the request 
> executor, the flows to the database. This too produced the OOM-error.
> 
> What are the best ways of implementing such pipelines so as to minimize the 
> memory footprint? Are there any differences between runners we should be 
> aware of here? (e.g. between Dataflow and Spark)