Hi Stephan,

Thanks for responding, comments inline below…

Regards,

— Ken

> On Jun 26, 2019, at 7:50 AM, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi Ken!
> 
> Sorry to hear you are going through this experience. The major focus on 
> streaming so far means that the DataSet API has stability issues at scale.
> So, yes, batch mode in current Flink version can be somewhat tricky.
> 
> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by 
> addressing batch specific scheduling / recovery / and shuffle issues.
> 
> Let me go through the issues you found:
> 
> (1) Input splits and oversized RPC
> 
> Your explanation seems correct, timeout due to dropping oversized RPC message.
> 
> I don't quite understand how that exactly happens, because the size limit is 
> 10 MB and input splits should be rather small in most cases.
> Are you running custom sources which put large data into splits? Maybe 
> accidentally, by having a large serialized closure in the splits?

As per my email to Till, I don’t feel like I’m doing anything tricky, though I 
am reading Hadoop sequence files that contain Cascading Tuple/Tuple key/value 
data.

> The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399 
> <https://issues.apache.org/jira/browse/FLINK-4399>  
> 
> (2) TM early release
> 
> The 1.8 version had a fix that should work for regular cases without 
> fine-grained failure recovery.
> 1.9 should have a more general fix that also works for fine-grained recovery
> 
> Are you trying to use the finer grained failover with the batch job?

No, or at least I’m not doing anything special to enable it.

Is there something I need to do to explicitly _disable_ it?

> The finer-grained failover is not working in batch for 1.8, that is why it is 
> not an advertised feature (it only works for streaming so far).
> 
> The goal is that this works in the 1.9 release (aka the batch fixup release)
> 
> (3) Hang in Processing
> 
> I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
> There are known issues with the current batch shuffle implementation, which 
> is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Next time it happens, I’ll dump the threads.

I should have done it this time, but was in a hurry to kill the EMR cluster as 
it had been costing money all night long :(



> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <kkrugler_li...@transpac.com 
> <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi all,
> 
> I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 
> 1.8.0, and it regularly fails, but for varying reasons.
> 
> Has anyone else had stability with 1.8.0 in batch mode and non-trivial 
> workflows?
> 
> Thanks,
> 
> — Ken
> 
> 1. TimeoutException getting input splits
> 
> The batch job starts by processing a lot of files that live in S3. During 
> this phase, I sometimes see:
> 
> 2019-06-20 01:20:22,659 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN 
> DataSource (at createInput(ExecutionEnvironment.java:549) 
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad 
> dailies) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) 
> -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) 
> (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Could not retrieve next input split.
>       at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>       at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
> Requesting the next input split failed.
>       at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>       at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>       ... 3 more
> Caused by: java.util.concurrent.TimeoutException
>       at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>       at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>       ... 4 more
> 2019-06-20 01:20:22,664 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink 
> Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) 
> switched from state RUNNING to FAILING.
> java.lang.RuntimeException: Could not retrieve next input split.
>       at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>       at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
> Requesting the next input split failed.
>       at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>       at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>       ... 3 more
> Caused by: java.util.concurrent.TimeoutException
>       at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>       at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>       ... 4 more
> 
> I saw bjb...@gmail.com <mailto:bjb...@gmail.com>’s email recently about a 
> similar issue:
> 
>> I figured this out myself. In my yarn container logs I saw this 
>> warning/error,
>> 
>> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
>> Actor[akka.tcp://flink@HOST:43911/temp/$n]: <> max allowed size 10485760 
>> bytes, actual size of encoded class 
>> org.apache.flink.runtime.jobmaster.SerializedInputSplit was 15728643 bytes.
>> 
>> Looking into this there is a max frame size for Akka which in flink can be 
>> set with akka.framesize and is set to 10MB by default. Increasing this past 
>> the size of my side input fixed the issue. I'm guessing this is due to 
>> creating the side input PCollection from memory using the Create.of APIs.
> 
> But no such akka.remote.OversizedPayloadException appears in any of my log 
> files.
> 
> 2. TM released too soon?
> 
> Sometimes it fails with "Connecting the channel failed: Connecting to remote 
> task manager xxx has failed. This might indicate that the remote task manager 
> has been lost”
> 
> I’d run into this previously with 1.7.2, but thought that 1.8.0 had the fix 
> for https://issues.apache.org/jira/browse/FLINK-10941 
> <https://issues.apache.org/jira/browse/FLINK-10941>, and thus I’d avoid the 
> problem, but it seems like there’s still an issue.
> 
> I’m running 3 TMs on three servers, each with 32 slots. When the job fails, 
> the servers are under heavy CPU load.
> 
> From the logs, I see the JobManager releasing two of the TMs, then requesting 
> two new containers. One of these requests gets filled, and that new TM starts 
> getting tasks for its slots.
> 
> But then soon afterwards that new TM and the one original TM still left 
> around start failing because they aren’t getting data from (I think) the 
> other TM that was released.
> 
> Any thoughts on what’s going wrong? Is the bug not actually fully fixed? Or 
> is there some TM timeout value that I should bump?
> 
> In the job manager log file I see where the two TMs are getting released...
> 
> 2019-05-17 17:42:50,215 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Releasing 
> idle slot [d947cd800b0ef2671259c7b048c3f7fc].
> 2019-05-17 17:43:38,942 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Stopping container container_1558074033518_0003_01_000002.
> 2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Closing TaskExecutor connection 
> container_1558074033518_0003_01_000002 because: TaskExecutor exceeded the 
> idle timeout.
> 2019-05-17 17:43:38,978 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Stopping container container_1558074033518_0003_01_000004.
> 2019-05-17 17:43:38,998 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Closing TaskExecutor connection 
> container_1558074033518_0003_01_000004 because: TaskExecutor exceeded the 
> idle timeout.
> 2019-05-17 17:43:39,005 WARN  org.apache.flink.yarn.YarnResourceManager       
>               - Discard registration from TaskExecutor 
> container_1558074033518_0003_01_000002 at 
> (akka.tcp://flink@ip-10-28-81-66.ec2.internal:36311/user/taskmanager_0 <>) 
> because the framework did not recognize it
> 2019-05-17 17:43:39,006 WARN  org.apache.flink.yarn.YarnResourceManager       
>               - Discard registration from TaskExecutor 
> container_1558074033518_0003_01_000004 at 
> (akka.tcp://flink@ip-10-47-197-146.ec2.internal:44403/user/taskmanager_0 <>) 
> because the framework did not recognize it
> 
> And then later on the requests for the replacement TMs.
> 
> 2019-05-17 17:45:01,655 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Requesting new TaskExecutor container with resources 
> <memory:44000, vCores:32>. Number pending requests 1.
> 2019-05-17 17:45:01,662 INFO org.apache.flink.yarn.YarnResourceManager - 
> Requesting new TaskExecutor container with resources <memory:44000, 
> vCores:32>. Number pending requests 2.
> 
> And then one of the requests is satisfied:
> 
> 2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1558074033518_0003_01_000006 - Remaining pending container 
> requests: 2
> 2019-05-17 17:45:04,360 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Removing container request Capability[<memory:44000, 
> vCores:32>]Priority[1]. Pending container requests 1.
> 2019-05-17 17:45:04,836 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Creating container launch context for TaskManagers
> 2019-05-17 17:45:04,837 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Starting TaskManagers
> 
> So it seems like TMs are being allocated, but soon afterwards:
> 
> 2019-05-17 17:45:12,907 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map 
> (Map at createWorkflow(AdvertiserSimilarityWorkflow.java:127)) -> Map (Key 
> Extractor) (73/96) (1e16d6ca293330933b7cece67644635f) switched from RUNNING 
> to FAILED.
> java.io.IOException: Connecting the channel failed: Connecting to remote task 
> manager + 'ip-10-28-81-66.ec2.internal/10.28.81.66:40317 
> <http://10.28.81.66:40317/>' has failed. This might indicate that the remote 
> task manager has been lost.
>       at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>       at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
>       at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:69)
>       at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
>       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>       at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:101)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
> 
> On one of the TMs that was released, I see at the end of its log:
> 
> 2019-05-17 17:42:50,217 INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot 
> TaskSlot(index:3, state:ACTIVE, resource profile: 
> ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147
> 483647, directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, 
> networkMemoryInMB=2147483647}, allocationId: 
> e3e7b383fe2db6376c82e5f3be7e02cb, jobId: eff57179c5c0e7d475c3b69d1a063017).
> 2019-05-17 17:42:50,217 INFO  
> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Remove job 
> eff57179c5c0e7d475c3b69d1a063017 from job leader monitoring.
> 2019-05-17 17:42:50,217 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close 
> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
> 2019-05-17 17:42:50,222 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close 
> JobManager connection for job eff57179c5c0e7d475c3b69d1a063017.
> 2019-05-17 17:42:50,222 INFO  
> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot 
> reconnect to job eff57179c5c0e7d475c3b69d1a063017 because it is not 
> registered.
> 2019-05-17 17:43:38,982 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close 
> ResourceManager connection 2c23394abcc92fbc068529591fbf7ceb.
> 2019-05-17 17:43:38,982 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to 
> ResourceManager 
> akka.tcp://flink@ip-10-30-52-224.ec2.internal:35979/user/resourcemanager(0000000000000000000000000
>  <>0000000) <>.
> 2019-05-17 17:43:38,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner    
>               - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
> 2019-05-17 17:43:38,988 INFO  
> org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down 
> BLOB cache
> 2019-05-17 17:43:38,989 INFO  
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - 
> Shutting down TaskExecutorLocalStateStoresManager.
> 2019-05-17 17:43:38,990 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down 
> BLOB cache
> 2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache    
>               - removed file cache directory 
> /mnt/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-a5e9444f-6bb6-4da1-9067-8d2e7351cb61
> 2019-05-17 17:43:38,991 INFO  org.apache.flink.runtime.filecache.FileCache    
>               - removed file cache directory 
> /mnt1/yarn/usercache/hadoop/appcache/application_1558074033518_0003/flink-dist-cache-925eed9f-fcf5-4d48-8ac9-bce29e9116ef
> 2019-05-17 17:43:39,004 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved 
> ResourceManager address, beginning registration
> 2019-05-17 17:43:39,004 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration 
> at ResourceManager attempt 1 (timeout=100ms)
> 2019-05-17 17:43:39,012 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration 
> at ResourceManager was declined: unrecognized TaskExecutor
> 2019-05-17 17:43:39,012 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Pausing and 
> re-attempting registration in 30000 ms
> 
> And in the replacement TM that was started, it fails with:
> 
> 2019-05-17 17:45:12,048 ERROR org.apache.flink.runtime.operators.BatchTask    
>               - Error in task code:  Map (Key Extractor) (34/96)
> java.io.IOException: Connecting the channel failed: Connecting to remote task 
> manager + 'ip-10-47-197-146.ec2.internal/10.47.197.146:39133 
> <http://10.47.197.146:39133/>' has failed. This might indicate that the 
> remote task manager has been lost.
>       at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
> 
> Where the TM it’s trying to connect to is the one that was released and 
> hasn’t been restarted yet.
> 
> 3. Hang in processing
> 
> Sometimes it finishes the long-running (10 hour) operator, and then the two 
> downstream operators get stuck (these have a different parallelism, so 
> there’s a rebalance)
> 
> In the most recent example of this, they processed about 20% of the data 
> emitted by the long running operator. There are no errors in any of the logs. 
> The last real activity in the jobmanager.log shows that all of the downstream 
> operators were deployed...
> 
> 2019-06-22 14:58:36,648 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - CHAIN Map 
> (Packed features) -> Map (Key Extractor) (7/32) 
> (4a13a1d471c0ed5c2d9e66d2e4a98fd9) switched from DEPLOYING to RUNNING.
> 
> Then nothing anywhere, until this msg starts appearing in the log file every 
> 5 seconds or so…
> 
> 2019-06-22 22:56:11,303 INFO  
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Updating with 
> new AMRMToken
> 
> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to