Hi Qi,

I’m guessing you’re calling createInput() for each input file.

If so, then instead you want to do something like:

        Job job = Job.getInstance();

        for each file…
                FileInputFormat.addInputPath(job, new 
org.apache.hadoop.fs.Path(file path));

        env.createInput(HadoopInputs.createHadoopInput(…, job)

Flink/Hadoop will take care of parallelizing the reads from the files, given 
the parallelism that you’re specifying.

— Ken


> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com> wrote:
> 
> Hi,
> 
> We’re trying to distribute batch input data to (N) HDFS files partitioning by 
> hash using DataSet API. What I’m doing is like:
> 
> env.createInput(…)
>       .partitionByHash(0)
>       .setParallelism(N)
>       .output(…)
> 
> This works well for small number of files. But when we need to distribute to 
> large number of files (say 100K), the parallelism becomes too large and we 
> could not afford that many TMs.
> 
> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
> parallelism separately (using dynamic allocation). Is there anything similar 
> in Flink or other way we can achieve similar result? Thank you!
> 
> Qi

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to