Hi Stephan,

Thanks for your response!

Task manager lost/killed has been a recurring problem I've had with Flink
for the last few months, as I try to scale to larger and larger amounts of
data. I would be very grateful for some help figuring out how I can avoid
this. 

The program is set up something like this:
/
DataSet<CustomType> data = env.fromCollection(listOfFiles)
.rebalance()
.flatMap(new ReadFiles())                                               
.filter(new FilterData());

DataSet<Tuple8> computation1 = data
.map(new Compute1())
.distinct()
.map(new Compute2())
.groupBy(0, 1, 2)
.aggregate(SUM, 3).and(SUM, 4).and(SUM, 5);

Dataset<Tuple10> computation2 = data
.map(new Compute3())
.distinct()
.map(new Compute4())
.groupBy(0, 1, 2)
.aggregate(SUM, 3).and(SUM, 4).and(SUM, 5);

Dataset<Tuple12> finalOP = computation1.join(computation2)
                                                .where(0, 1)
                                                .equalTo(0, 1)
                                                .with(new Join1())
                                                .sortPartition(0, 
Order.ASCENDING)
                                                .setParallelism(1);

finalOP.writeAsCsv("s3://myBucket/myKey.csv");

---

public static final class ReadFiles implements FlatMapFunction<String,
CustomType> {
                @Override
                public void flatMap(String fileName, Collector<CustomType> out) 
throws
Exception {

                        S3FileReaderAndParser parser = new 
S3FileReaderAndParser(fileName);
                        List<CustomType> dataList = parser.parseFiles();
                        for (CustomType data : dataList) {
                                out.collect(data);
                        }
                }
        }
/

Task Manager is killed/lost during the ReadFiles() flatmap. ReadFiles is a
flatmap function that reads each of the files from S3 using the AWS S3 Java
SDK and parses and emits each of the protobufs. 

And yes, I can find a message like this in the logs about "gated" systems:
2017-10-12 20:46:00,355 WARN  akka.remote.ReliableDeliverySupervisor            
           
- Association with remote system [akka.tcp://flink@ip-172-31-8-29:38763] has
failed, address is now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@ip-172-31-8-29:38763]] Caused by: [Connection refused:
ip-172-31-8-29/172.31.8.29:38763]

Thank you!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to