> On May 19, 2021, at 7:26 AM, Yun Gao <yungao...@aliyun.com> wrote:
> 
> Hi Marco,
> 
> For the remaining issues, 
> 
> 1. For the aggregation, the 500GB of files are not required to be fit into 
> memory.
> Rough speaking for the keyed().window().reduce(), the input records would be 
> first
> sort according to the key (time_series.name) via external sorts, which only 
> consumes
> a fix amount of memory. Then for all the records of each key, flink would use 
> a special 
> single key statebackend to hold the intermediate result for this key, whose 
> memory
> consumption usually could be ignored. 

Thank you. However, if the source of data is s3, this means that locally the 
machine must have disk space to store the files from s3 and any intermediate 
files, right?

Considering that the source of data is 500 GB, doesn't this mean that the 
machine will need at least 500 GB of disk space?


> 2. The first operator (namely the source) would read the files direclty and 
> emit to the
> following operators directly, thus there should be no intermediate result 
> before the first
> operator. 
> 3. I wonder now flink does not support using S3 to store the intermediate 
> result, since it
> relies on local I/O mechanisms like mmap or local file read/write, and S3 
> seems not
> support. EBS seems to be ok.
> 4. The heartbeat timeout happens normally due to akka thread get blocked or 
> network issue. 
> To check if thread get blocked, you may first check the GC log to see if 
> there are long full gc, 
> if not, then check if the JM or TM akka thread get blocked via thread dump. 
> If it seems to be 
> the network issues, the job could configure heartbeat.timeout [1] to increase 
> the timeout.
> 
> Best,
> Yun
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout
> 
> 
> ------------------Original Mail ------------------
> Sender:Marco Villalobos <mvillalo...@kineteque.com>
> Send Date:Wed May 19 14:03:48 2021
> Recipients:user <user@flink.apache.org>
> Subject:Questions Flink DataStream in BATCH execution mode scalability advice
> Questions Flink DataStream in BATCH execution mode scalability advice.
> 
> Here is the problem that I am trying to solve.
> 
> Input is an S3 bucket directory with about 500 GB of data across many files. 
> The instance that I am running on only has 50GB of EBS storage. The nature of 
> this data is time series data. Imagine name, value, timestamp.
> 
> I must average the time_series.value by time_series.name on a tumbling window 
> of 15 minutes. Upon aggregation, the time_series.timestamp gets rounded up a 
> quarter.  I key by tag name and 15-minute interval.
> 
> After aggregation, I must forward fill the missing quarters for each 
> time_series.name. Currently, this forward fill operator is keyed only by 
> time_series.name. Does this mean that in batch mode, all of the time series 
> with the same time_series.name within the 500 gb of files must fit in memory?
> 
> The results are saved in a rdbms.
> 
> If this job somehow reads all 500 GB before it sends it to the first 
> operator, where is the data store?
> 
> Now considering that the EMR node only has 50GB of ebs (that's disk storage), 
> is there a means to configure Flink to store its intermediate results within 
> S3?
> 
> When the job failed, I saw this exception in the log: "Recovery is suppressed 
> by NoRestartBackoffTimeStrategy." Is there a way to configure this to recover?
> 
> My job keeps on failing for the same reason, it says, "The heartbeat of 
> TaskManager with id container_xxx timed out." Is there a way to configure it 
> not to timeout?
> 
> I would appreciate any advice on how I should save these problems. Thank you.
> 
> 

Reply via email to