Hi Ken,

Thanks for your reply. I may not make myself clear: our problem is not about 
reading but rather writing. 

We need to write to N files based on key partitioning. We have to use 
setParallelism() to set the output partition/file number, but when the 
partition number is too large (~100K), the parallelism would be too high. Is 
there any other way to achieve this?

Thanks,
Qi

> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> Hi Qi,
> 
> I’m guessing you’re calling createInput() for each input file.
> 
> If so, then instead you want to do something like:
> 
>       Job job = Job.getInstance();
> 
>       for each file…
>               FileInputFormat.addInputPath(job, new 
> org.apache.hadoop.fs.Path(file path));
> 
>       env.createInput(HadoopInputs.createHadoopInput(…, job)
> 
> Flink/Hadoop will take care of parallelizing the reads from the files, given 
> the parallelism that you’re specifying.
> 
> — Ken
> 
> 
>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com 
>> <mailto:luoqi...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> We’re trying to distribute batch input data to (N) HDFS files partitioning 
>> by hash using DataSet API. What I’m doing is like:
>> 
>> env.createInput(…)
>>       .partitionByHash(0)
>>       .setParallelism(N)
>>       .output(…)
>> 
>> This works well for small number of files. But when we need to distribute to 
>> large number of files (say 100K), the parallelism becomes too large and we 
>> could not afford that many TMs.
>> 
>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
>> parallelism separately (using dynamic allocation). Is there anything similar 
>> in Flink or other way we can achieve similar result? Thank you!
>> 
>> Qi
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

Reply via email to