FWIW I had to do something similar in the past. My solution was to…

1. Create a custom reader that added the source directory to the input data (so 
I had a Tuple2<source dir name, actual data>
2. Create a job that reads from all source directories, using HadoopInputFormat 
for text
3. Constrain the parallelism of this initial part of the job, to avoid 
overwhelming downloads from S3.
4. Do a partition on the source directory
5. Write a custom mapPartition function that opens/writes to output files that 
are created with names based on the source directory.

— Ken

> On Jul 8, 2021, at 3:19 PM, Jason Liu <jasonli...@ucla.edu> wrote:
> 
> Hi all,
> 
>     We currently have a use case of running a given dataset API job for a 
> given S3 directory to dedup data and output to a new directory. We need to 
> run this job for roughly ~1000 S3 folders. I attempted to set up the Flink 
> executions so it runs sequentially like this: 
> 
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
> Configuration parameters = new Configuration();
> parameters.setBoolean("recursive.file.enumeration", true);
> 
> for (final String inputDirectory : directoryList) {
>   String inputPath = inputDirectory;
>   String outputPath = getOutputPath(inputPath);
> 
>   log.warn("using input path [{}] and output path [{}]", inputPath, 
> outputPath);
> 
>   DataSet<String> lines = 
> env.readTextFile(inputPath).withParameters(parameters);
>   DataSet<String> deduped = lines.distinct(new GetKey());
>   deduped.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE);
> }
> env.execute();
> However, when I submit this job to the cluster, it generates a graph like 
> this 
> <image.png>
> And it seems Flink is running them in parallel. Is there a way to tell Flink 
> to run it sequentially? I tried moving the execution environment inside the 
> loop but it seems like it only runs the job on the first directory. I'm 
> running this on AWS Kinesis Data Analytics, so it's a bit hard for me to 
> submit new jobs. 
> 
> Wondering if there's any way I can accomplish this?
> 
> Thanks,
> Jason
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to