Hi Ken,

Do you mean that I can create a batch sink which writes to N files? That sounds 
viable, but since our data size is huge (billions of records & thousands of 
files), the performance may be unacceptable. I will check Blink and give it a 
try anyway.

Thank you,
Qi

> On Mar 12, 2019, at 11:58 PM, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> Hi Qi,
> 
> If I understand what you’re trying to do, then this sounds like a variation 
> of a bucketing sink.
> 
> That typically uses a field value to create a directory path or a file name 
> (though the filename case is only viable when the field is also what’s used 
> to partition the data)
> 
> But I don’t believe Flink has built-in support for that, in batch mode (see 
> BucketingSink 
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
>  for streaming).
> 
> Maybe Blink has added that? Hoping someone who knows that codebase can chime 
> in here.
> 
> Otherwise you’ll need to create a custom sink to implement the desired 
> behavior - though abusing a MapPartitionFunction 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html>
>  would be easiest, I think.
> 
> — Ken
> 
> 
> 
>> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com 
>> <mailto:luoqi...@gmail.com>> wrote:
>> 
>> Hi Ken,
>> 
>> Thanks for your reply. I may not make myself clear: our problem is not about 
>> reading but rather writing. 
>> 
>> We need to write to N files based on key partitioning. We have to use 
>> setParallelism() to set the output partition/file number, but when the 
>> partition number is too large (~100K), the parallelism would be too high. Is 
>> there any other way to achieve this?
>> 
>> Thanks,
>> Qi
>> 
>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com 
>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>> 
>>> Hi Qi,
>>> 
>>> I’m guessing you’re calling createInput() for each input file.
>>> 
>>> If so, then instead you want to do something like:
>>> 
>>>             Job job = Job.getInstance();
>>> 
>>>     for each file…
>>>             FileInputFormat.addInputPath(job, new 
>>> org.apache.hadoop.fs.Path(file path));
>>> 
>>>     env.createInput(HadoopInputs.createHadoopInput(…, job)
>>> 
>>> Flink/Hadoop will take care of parallelizing the reads from the files, 
>>> given the parallelism that you’re specifying.
>>> 
>>> — Ken
>>> 
>>> 
>>>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com 
>>>> <mailto:luoqi...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> We’re trying to distribute batch input data to (N) HDFS files partitioning 
>>>> by hash using DataSet API. What I’m doing is like:
>>>> 
>>>> env.createInput(…)
>>>>       .partitionByHash(0)
>>>>       .setParallelism(N)
>>>>       .output(…)
>>>> 
>>>> This works well for small number of files. But when we need to distribute 
>>>> to large number of files (say 100K), the parallelism becomes too large and 
>>>> we could not afford that many TMs.
>>>> 
>>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
>>>> parallelism separately (using dynamic allocation). Is there anything 
>>>> similar in Flink or other way we can achieve similar result? Thank you!
>>>> 
>>>> Qi
>>> 
>>> --------------------------
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> 
>> 
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

Reply via email to