Hi Fabian,

I understand this is a by-design behavior, since Flink is firstly built for 
streaming. Supporting batch shuffle and custom partition number in Flink may be 
compelling in batch processing. 

Could you help explain a bit more on which works are needed to be done, so 
Flink can support custom partition numbers numbers? We would be willing to help 
improve this area.

Thanks,
Qi

> On Mar 15, 2019, at 4:25 PM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi,
> 
> Flink works a bit differently than Spark.
> By default, Flink uses pipelined shuffles which push results of the sender 
> immediately to the receivers (btw. this is one of the building blocks for 
> stream processing).
> However, pipelined shuffles require that all receivers are online. Hence, 
> there number of partitions determines the number of running tasks.
> There is also a batch shuffle mode, but it needs to be explicitly enabled and 
> AFAIK does not resolve the dependency of number of partitions and task 
> parallelism.
> 
> However, the community is currently working on many improvements for batch 
> processing, including scheduling and fault-tolerance. 
> Batched shuffles are an important building block for this and there might be 
> better support for your use case in the future.
> 
> Best, Fabian
> 
> Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo <luoqi...@gmail.com 
> <mailto:luoqi...@gmail.com>>:
> Hi Ken,
> 
> That looks awesome! I’ve implemented something similar to your bucketing 
> sink, but using multiple internal writers rather than multiple internal 
> output.
> 
> Besides this, I’m also curious whether Flink can achieve this like Spark: 
> allow user to specify partition number in partitionBy() method (so no 
> multiple output formats are needed). But this seems to need non-trivial 
> changes in Flink core.
> 
> Thanks,
> Qi
> 
>> On Mar 15, 2019, at 2:36 AM, Ken Krugler <kkrugler_li...@transpac.com 
>> <mailto:kkrugler_li...@transpac.com>> wrote:
>> 
>> Hi Qi,
>> 
>> See https://github.com/ScaleUnlimited/flink-utils/ 
>> <https://github.com/ScaleUnlimited/flink-utils/>, for a rough but working 
>> version of a bucketing sink.
>> 
>> — Ken
>> 
>> 
>>> On Mar 13, 2019, at 7:46 PM, qi luo <luoqi...@gmail.com 
>>> <mailto:luoqi...@gmail.com>> wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Agree. I will try partitonBy() to reducer the number of parallel sinks, and 
>>> may also try sortPartition() so each sink could write files one by one. 
>>> Looking forward to your solution. :)
>>> 
>>> Thanks,
>>> Qi
>>> 
>>>> On Mar 14, 2019, at 2:54 AM, Ken Krugler <kkrugler_li...@transpac.com 
>>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>>> 
>>>> Hi Qi,
>>>> 
>>>>> On Mar 13, 2019, at 1:26 AM, qi luo <luoqi...@gmail.com 
>>>>> <mailto:luoqi...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Ken,
>>>>> 
>>>>> Do you mean that I can create a batch sink which writes to N files? 
>>>> 
>>>> Correct.
>>>> 
>>>>> That sounds viable, but since our data size is huge (billions of records 
>>>>> & thousands of files), the performance may be unacceptable. 
>>>> 
>>>> The main issue with performance (actually memory usage) is how many 
>>>> OutputFormats do you need to have open at the same time.
>>>> 
>>>> If you partition by the same key that’s used to define buckets, then the 
>>>> max number is less, as each parallel instance of the sink only gets a 
>>>> unique subset of all possible bucket values.
>>>> 
>>>> I’m actually dealing with something similar now, so I might have a 
>>>> solution to share soon.
>>>> 
>>>> — Ken
>>>> 
>>>> 
>>>>> I will check Blink and give it a try anyway.
>>>>> 
>>>>> Thank you,
>>>>> Qi
>>>>> 
>>>>>> On Mar 12, 2019, at 11:58 PM, Ken Krugler <kkrugler_li...@transpac.com 
>>>>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>>>>> 
>>>>>> Hi Qi,
>>>>>> 
>>>>>> If I understand what you’re trying to do, then this sounds like a 
>>>>>> variation of a bucketing sink.
>>>>>> 
>>>>>> That typically uses a field value to create a directory path or a file 
>>>>>> name (though the filename case is only viable when the field is also 
>>>>>> what’s used to partition the data)
>>>>>> 
>>>>>> But I don’t believe Flink has built-in support for that, in batch mode 
>>>>>> (see BucketingSink 
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
>>>>>>  for streaming).
>>>>>> 
>>>>>> Maybe Blink has added that? Hoping someone who knows that codebase can 
>>>>>> chime in here.
>>>>>> 
>>>>>> Otherwise you’ll need to create a custom sink to implement the desired 
>>>>>> behavior - though abusing a MapPartitionFunction 
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html>
>>>>>>  would be easiest, I think.
>>>>>> 
>>>>>> — Ken
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com 
>>>>>>> <mailto:luoqi...@gmail.com>> wrote:
>>>>>>> 
>>>>>>> Hi Ken,
>>>>>>> 
>>>>>>> Thanks for your reply. I may not make myself clear: our problem is not 
>>>>>>> about reading but rather writing. 
>>>>>>> 
>>>>>>> We need to write to N files based on key partitioning. We have to use 
>>>>>>> setParallelism() to set the output partition/file number, but when the 
>>>>>>> partition number is too large (~100K), the parallelism would be too 
>>>>>>> high. Is there any other way to achieve this?
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Qi
>>>>>>> 
>>>>>>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com 
>>>>>>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>>>>>>> 
>>>>>>>> Hi Qi,
>>>>>>>> 
>>>>>>>> I’m guessing you’re calling createInput() for each input file.
>>>>>>>> 
>>>>>>>> If so, then instead you want to do something like:
>>>>>>>> 
>>>>>>>>        Job job = Job.getInstance();
>>>>>>>> 
>>>>>>>>        for each file…
>>>>>>>>                FileInputFormat.addInputPath(job, new 
>>>>>>>> org.apache.hadoop.fs.Path(file path));
>>>>>>>> 
>>>>>>>>        env.createInput(HadoopInputs.createHadoopInput(…, job)
>>>>>>>> 
>>>>>>>> Flink/Hadoop will take care of parallelizing the reads from the files, 
>>>>>>>> given the parallelism that you’re specifying.
>>>>>>>> 
>>>>>>>> — Ken
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com 
>>>>>>>>> <mailto:luoqi...@gmail.com>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> We’re trying to distribute batch input data to (N) HDFS files 
>>>>>>>>> partitioning by hash using DataSet API. What I’m doing is like:
>>>>>>>>> 
>>>>>>>>> env.createInput(…)
>>>>>>>>>       .partitionByHash(0)
>>>>>>>>>       .setParallelism(N)
>>>>>>>>>       .output(…)
>>>>>>>>> 
>>>>>>>>> This works well for small number of files. But when we need to 
>>>>>>>>> distribute to large number of files (say 100K), the parallelism 
>>>>>>>>> becomes too large and we could not afford that many TMs.
>>>>>>>>> 
>>>>>>>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control 
>>>>>>>>> the parallelism separately (using dynamic allocation). Is there 
>>>>>>>>> anything similar in Flink or other way we can achieve similar result? 
>>>>>>>>> Thank you!
>>>>>>>>> 
>>>>>>>>> Qi
>>>>>>>> 
>>>>>>>> --------------------------
>>>>>>>> Ken Krugler
>>>>>>>> +1 530-210-6378
>>>>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>>>>>> Custom big data solutions & training
>>>>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> --------------------------
>>>>>> Ken Krugler
>>>>>> +1 530-210-6378
>>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>>>> Custom big data solutions & training
>>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>>>> 
>>>>> 
>>>> 
>>>> --------------------------
>>>> Ken Krugler
>>>> +1 530-210-6378
>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>> Custom big data solutions & training
>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> 
>> 
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 

Reply via email to