Hi Ken,

Agree. I will try partitonBy() to reducer the number of parallel sinks, and may 
also try sortPartition() so each sink could write files one by one. Looking 
forward to your solution. :)

Thanks,
Qi

> On Mar 14, 2019, at 2:54 AM, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> Hi Qi,
> 
>> On Mar 13, 2019, at 1:26 AM, qi luo <luoqi...@gmail.com 
>> <mailto:luoqi...@gmail.com>> wrote:
>> 
>> Hi Ken,
>> 
>> Do you mean that I can create a batch sink which writes to N files? 
> 
> Correct.
> 
>> That sounds viable, but since our data size is huge (billions of records & 
>> thousands of files), the performance may be unacceptable. 
> 
> The main issue with performance (actually memory usage) is how many 
> OutputFormats do you need to have open at the same time.
> 
> If you partition by the same key that’s used to define buckets, then the max 
> number is less, as each parallel instance of the sink only gets a unique 
> subset of all possible bucket values.
> 
> I’m actually dealing with something similar now, so I might have a solution 
> to share soon.
> 
> — Ken
> 
> 
>> I will check Blink and give it a try anyway.
>> 
>> Thank you,
>> Qi
>> 
>>> On Mar 12, 2019, at 11:58 PM, Ken Krugler <kkrugler_li...@transpac.com 
>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>> 
>>> Hi Qi,
>>> 
>>> If I understand what you’re trying to do, then this sounds like a variation 
>>> of a bucketing sink.
>>> 
>>> That typically uses a field value to create a directory path or a file name 
>>> (though the filename case is only viable when the field is also what’s used 
>>> to partition the data)
>>> 
>>> But I don’t believe Flink has built-in support for that, in batch mode (see 
>>> BucketingSink 
>>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
>>>  for streaming).
>>> 
>>> Maybe Blink has added that? Hoping someone who knows that codebase can 
>>> chime in here.
>>> 
>>> Otherwise you’ll need to create a custom sink to implement the desired 
>>> behavior - though abusing a MapPartitionFunction 
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html>
>>>  would be easiest, I think.
>>> 
>>> — Ken
>>> 
>>> 
>>> 
>>>> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com 
>>>> <mailto:luoqi...@gmail.com>> wrote:
>>>> 
>>>> Hi Ken,
>>>> 
>>>> Thanks for your reply. I may not make myself clear: our problem is not 
>>>> about reading but rather writing. 
>>>> 
>>>> We need to write to N files based on key partitioning. We have to use 
>>>> setParallelism() to set the output partition/file number, but when the 
>>>> partition number is too large (~100K), the parallelism would be too high. 
>>>> Is there any other way to achieve this?
>>>> 
>>>> Thanks,
>>>> Qi
>>>> 
>>>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com 
>>>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>>>> 
>>>>> Hi Qi,
>>>>> 
>>>>> I’m guessing you’re calling createInput() for each input file.
>>>>> 
>>>>> If so, then instead you want to do something like:
>>>>> 
>>>>>           Job job = Job.getInstance();
>>>>> 
>>>>>   for each file…
>>>>>           FileInputFormat.addInputPath(job, new 
>>>>> org.apache.hadoop.fs.Path(file path));
>>>>> 
>>>>>   env.createInput(HadoopInputs.createHadoopInput(…, job)
>>>>> 
>>>>> Flink/Hadoop will take care of parallelizing the reads from the files, 
>>>>> given the parallelism that you’re specifying.
>>>>> 
>>>>> — Ken
>>>>> 
>>>>> 
>>>>>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com 
>>>>>> <mailto:luoqi...@gmail.com>> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> We’re trying to distribute batch input data to (N) HDFS files 
>>>>>> partitioning by hash using DataSet API. What I’m doing is like:
>>>>>> 
>>>>>> env.createInput(…)
>>>>>>       .partitionByHash(0)
>>>>>>       .setParallelism(N)
>>>>>>       .output(…)
>>>>>> 
>>>>>> This works well for small number of files. But when we need to 
>>>>>> distribute to large number of files (say 100K), the parallelism becomes 
>>>>>> too large and we could not afford that many TMs.
>>>>>> 
>>>>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control 
>>>>>> the parallelism separately (using dynamic allocation). Is there anything 
>>>>>> similar in Flink or other way we can achieve similar result? Thank you!
>>>>>> 
>>>>>> Qi
>>>>> 
>>>>> --------------------------
>>>>> Ken Krugler
>>>>> +1 530-210-6378
>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>>> Custom big data solutions & training
>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>>> 
>>>> 
>>> 
>>> --------------------------
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> 
>> 
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to