Hi Qi, I’m guessing you’re calling createInput() for each input file.
If so, then instead you want to do something like: Job job = Job.getInstance(); for each file… FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(file path)); env.createInput(HadoopInputs.createHadoopInput(…, job) Flink/Hadoop will take care of parallelizing the reads from the files, given the parallelism that you’re specifying. — Ken > On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com> wrote: > > Hi, > > We’re trying to distribute batch input data to (N) HDFS files partitioning by > hash using DataSet API. What I’m doing is like: > > env.createInput(…) > .partitionByHash(0) > .setParallelism(N) > .output(…) > > This works well for small number of files. But when we need to distribute to > large number of files (say 100K), the parallelism becomes too large and we > could not afford that many TMs. > > In spark we can write something like ‘rdd.partitionBy(N)’ and control the > parallelism separately (using dynamic allocation). Is there anything similar > in Flink or other way we can achieve similar result? Thank you! > > Qi -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra