Hi Qi,

If I understand what you’re trying to do, then this sounds like a variation of 
a bucketing sink.

That typically uses a field value to create a directory path or a file name 
(though the filename case is only viable when the field is also what’s used to 
partition the data)

But I don’t believe Flink has built-in support for that, in batch mode (see 
BucketingSink 
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
 for streaming).

Maybe Blink has added that? Hoping someone who knows that codebase can chime in 
here.

Otherwise you’ll need to create a custom sink to implement the desired behavior 
- though abusing a MapPartitionFunction 
<https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html>
 would be easiest, I think.

— Ken



> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com> wrote:
> 
> Hi Ken,
> 
> Thanks for your reply. I may not make myself clear: our problem is not about 
> reading but rather writing. 
> 
> We need to write to N files based on key partitioning. We have to use 
> setParallelism() to set the output partition/file number, but when the 
> partition number is too large (~100K), the parallelism would be too high. Is 
> there any other way to achieve this?
> 
> Thanks,
> Qi
> 
>> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com 
>> <mailto:kkrugler_li...@transpac.com>> wrote:
>> 
>> Hi Qi,
>> 
>> I’m guessing you’re calling createInput() for each input file.
>> 
>> If so, then instead you want to do something like:
>> 
>>      Job job = Job.getInstance();
>> 
>>      for each file…
>>              FileInputFormat.addInputPath(job, new 
>> org.apache.hadoop.fs.Path(file path));
>> 
>>      env.createInput(HadoopInputs.createHadoopInput(…, job)
>> 
>> Flink/Hadoop will take care of parallelizing the reads from the files, given 
>> the parallelism that you’re specifying.
>> 
>> — Ken
>> 
>> 
>>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com 
>>> <mailto:luoqi...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> We’re trying to distribute batch input data to (N) HDFS files partitioning 
>>> by hash using DataSet API. What I’m doing is like:
>>> 
>>> env.createInput(…)
>>>       .partitionByHash(0)
>>>       .setParallelism(N)
>>>       .output(…)
>>> 
>>> This works well for small number of files. But when we need to distribute 
>>> to large number of files (say 100K), the parallelism becomes too large and 
>>> we could not afford that many TMs.
>>> 
>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
>>> parallelism separately (using dynamic allocation). Is there anything 
>>> similar in Flink or other way we can achieve similar result? Thank you!
>>> 
>>> Qi
>> 
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to