Thank you Fabian! I will check these issues.

> On Mar 20, 2019, at 4:25 PM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi,
> 
> I'm sorry but I'm only familiar with the high-level design but not with the 
> implementation details and concrete roadmap for the involved components.
> I think that FLINK-10288 [1] and FLINK-10429 [2] are related to partition 
> handling.
> 
> Best,
> Fabian
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10288 
> <https://issues.apache.org/jira/browse/FLINK-10288>
> [2] https://issues.apache.org/jira/browse/FLINK-10429 
> <https://issues.apache.org/jira/browse/FLINK-10429>
> 
> 
> Am Fr., 15. März 2019 um 12:13 Uhr schrieb qi luo <luoqi...@gmail.com 
> <mailto:luoqi...@gmail.com>>:
> Hi Fabian,
> 
> I understand this is a by-design behavior, since Flink is firstly built for 
> streaming. Supporting batch shuffle and custom partition number in Flink may 
> be compelling in batch processing. 
> 
> Could you help explain a bit more on which works are needed to be done, so 
> Flink can support custom partition numbers numbers? We would be willing to 
> help improve this area.
> 
> Thanks,
> Qi
> 
>> On Mar 15, 2019, at 4:25 PM, Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> Flink works a bit differently than Spark.
>> By default, Flink uses pipelined shuffles which push results of the sender 
>> immediately to the receivers (btw. this is one of the building blocks for 
>> stream processing).
>> However, pipelined shuffles require that all receivers are online. Hence, 
>> there number of partitions determines the number of running tasks.
>> There is also a batch shuffle mode, but it needs to be explicitly enabled 
>> and AFAIK does not resolve the dependency of number of partitions and task 
>> parallelism.
>> 
>> However, the community is currently working on many improvements for batch 
>> processing, including scheduling and fault-tolerance. 
>> Batched shuffles are an important building block for this and there might be 
>> better support for your use case in the future.
>> 
>> Best, Fabian
>> 
>> Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo <luoqi...@gmail.com 
>> <mailto:luoqi...@gmail.com>>:
>> Hi Ken,
>> 
>> That looks awesome! I’ve implemented something similar to your bucketing 
>> sink, but using multiple internal writers rather than multiple internal 
>> output.
>> 
>> Besides this, I’m also curious whether Flink can achieve this like Spark: 
>> allow user to specify partition number in partitionBy() method (so no 
>> multiple output formats are needed). But this seems to need non-trivial 
>> changes in Flink core.
>> 
>> Thanks,
>> Qi
>> 
>>> On Mar 15, 2019, at 2:36 AM, Ken Krugler <kkrugler_li...@transpac.com 
>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>> 
>>> Hi Qi,
>>> 
>>> See https://github.com/ScaleUnlimited/flink-utils/ 
>>> <https://github.com/ScaleUnlimited/flink-utils/>, for a rough but working 
>>> version of a bucketing sink.
>>> 
>>> — Ken
>>> 
>>> 
>>>> On Mar 13, 2019, at 7:46 PM, qi luo <luoqi...@gmail.com 
>>>> <mailto:luoqi...@gmail.com>> wrote:
>>>> 
>>>> Hi Ken,
>>>> 
>>>> Agree. I will try partitonBy() to reducer the number of parallel sinks, 
>>>> and may also try sortPartition() so each sink could write files one by 
>>>> one. Looking forward to your solution. :)
>>>> 
>>>> Thanks,
>>>> Qi
>>>> 
>>>>> On Mar 14, 2019, at 2:54 AM, Ken Krugler <kkrugler_li...@transpac.com 
>>>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>>>> 
>>>>> Hi Qi,
>>>>> 
>>>>>> On Mar 13, 2019, at 1:26 AM, qi luo <luoqi...@gmail.com 
>>>>>> <mailto:luoqi...@gmail.com>> wrote:
>>>>>> 
>>>>>> Hi Ken,
>>>>>> 
>>>>>> Do you mean that I can create a batch sink which writes to N files? 
>>>>> 
>>>>> Correct.
>>>>> 
>>>>>> That sounds viable, but since our data size is huge (billions of records 
>>>>>> & thousands of files), the performance may be unacceptable. 
>>>>> 
>>>>> The main issue with performance (actually memory usage) is how many 
>>>>> OutputFormats do you need to have open at the same time.
>>>>> 
>>>>> If you partition by the same key that’s used to define buckets, then the 
>>>>> max number is less, as each parallel instance of the sink only gets a 
>>>>> unique subset of all possible bucket values.
>>>>> 
>>>>> I’m actually dealing with something similar now, so I might have a 
>>>>> solution to share soon.
>>>>> 
>>>>> — Ken
>>>>> 
>>>>> 
>>>>>> I will check Blink and give it a try anyway.
>>>>>> 
>>>>>> Thank you,
>>>>>> Qi
>>>>>> 
>>>>>>> On Mar 12, 2019, at 11:58 PM, Ken Krugler <kkrugler_li...@transpac.com 
>>>>>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>>>>>> 
>>>>>>> Hi Qi,
>>>>>>> 
>>>>>>> If I understand what you’re trying to do, then this sounds like a 
>>>>>>> variation of a bucketing sink.
>>>>>>> 
>>>>>>> That typically uses a field value to create a directory path or a file 
>>>>>>> name (though the filename case is only viable when the field is also 
>>>>>>> what’s used to partition the data)
>>>>>>> 
>>>>>>> But I don’t believe Flink has built-in support for that, in batch mode 
>>>>>>> (see BucketingSink 
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
>>>>>>>  for streaming).
>>>>>>> 
>>>>>>> Maybe Blink has added that? Hoping someone who knows that codebase can 
>>>>>>> chime in here.
>>>>>>> 
>>>>>>> Otherwise you’ll need to create a custom sink to implement the desired 
>>>>>>> behavior - though abusing a MapPartitionFunction 
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html>
>>>>>>>  would be easiest, I think.
>>>>>>> 
>>>>>>> — Ken
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com 
>>>>>>>> <mailto:luoqi...@gmail.com>> wrote:
>>>>>>>> 
>>>>>>>> Hi Ken,
>>>>>>>> 
>>>>>>>> Thanks for your reply. I may not make myself clear: our problem is not 
>>>>>>>> about reading but rather writing. 
>>>>>>>> 
>>>>>>>> We need to write to N files based on key partitioning. We have to use 
>>>>>>>> setParallelism() to set the output partition/file number, but when the 
>>>>>>>> partition number is too large (~100K), the parallelism would be too 
>>>>>>>> high. Is there any other way to achieve this?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Qi
>>>>>>>> 
>>>>>>>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler 
>>>>>>>>> <kkrugler_li...@transpac.com <mailto:kkrugler_li...@transpac.com>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Qi,
>>>>>>>>> 
>>>>>>>>> I’m guessing you’re calling createInput() for each input file.
>>>>>>>>> 
>>>>>>>>> If so, then instead you want to do something like:
>>>>>>>>> 
>>>>>>>>>       Job job = Job.getInstance();
>>>>>>>>> 
>>>>>>>>>       for each file…
>>>>>>>>>               FileInputFormat.addInputPath(job, new 
>>>>>>>>> org.apache.hadoop.fs.Path(file path));
>>>>>>>>> 
>>>>>>>>>       env.createInput(HadoopInputs.createHadoopInput(…, job)
>>>>>>>>> 
>>>>>>>>> Flink/Hadoop will take care of parallelizing the reads from the 
>>>>>>>>> files, given the parallelism that you’re specifying.
>>>>>>>>> 
>>>>>>>>> — Ken
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com 
>>>>>>>>>> <mailto:luoqi...@gmail.com>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> We’re trying to distribute batch input data to (N) HDFS files 
>>>>>>>>>> partitioning by hash using DataSet API. What I’m doing is like:
>>>>>>>>>> 
>>>>>>>>>> env.createInput(…)
>>>>>>>>>>       .partitionByHash(0)
>>>>>>>>>>       .setParallelism(N)
>>>>>>>>>>       .output(…)
>>>>>>>>>> 
>>>>>>>>>> This works well for small number of files. But when we need to 
>>>>>>>>>> distribute to large number of files (say 100K), the parallelism 
>>>>>>>>>> becomes too large and we could not afford that many TMs.
>>>>>>>>>> 
>>>>>>>>>> In spark we can write something like ‘rdd.partitionBy(N)’ and 
>>>>>>>>>> control the parallelism separately (using dynamic allocation). Is 
>>>>>>>>>> there anything similar in Flink or other way we can achieve similar 
>>>>>>>>>> result? Thank you!
>>>>>>>>>> 
>>>>>>>>>> Qi
>>>>>>>>> 
>>>>>>>>> --------------------------
>>>>>>>>> Ken Krugler
>>>>>>>>> +1 530-210-6378
>>>>>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>>>>>>> Custom big data solutions & training
>>>>>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> --------------------------
>>>>>>> Ken Krugler
>>>>>>> +1 530-210-6378
>>>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>>>>> Custom big data solutions & training
>>>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> --------------------------
>>>>> Ken Krugler
>>>>> +1 530-210-6378
>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>>>> Custom big data solutions & training
>>>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>> 
>>> 
>>> --------------------------
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> 
>> 
> 

Reply via email to