Hi Fabian, I understand this is a by-design behavior, since Flink is firstly built for streaming. Supporting batch shuffle and custom partition number in Flink may be compelling in batch processing.
Could you help explain a bit more on which works are needed to be done, so Flink can support custom partition numbers numbers? We would be willing to help improve this area. Thanks, Qi > On Mar 15, 2019, at 4:25 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi, > > Flink works a bit differently than Spark. > By default, Flink uses pipelined shuffles which push results of the sender > immediately to the receivers (btw. this is one of the building blocks for > stream processing). > However, pipelined shuffles require that all receivers are online. Hence, > there number of partitions determines the number of running tasks. > There is also a batch shuffle mode, but it needs to be explicitly enabled and > AFAIK does not resolve the dependency of number of partitions and task > parallelism. > > However, the community is currently working on many improvements for batch > processing, including scheduling and fault-tolerance. > Batched shuffles are an important building block for this and there might be > better support for your use case in the future. > > Best, Fabian > > Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo <luoqi...@gmail.com > <mailto:luoqi...@gmail.com>>: > Hi Ken, > > That looks awesome! I’ve implemented something similar to your bucketing > sink, but using multiple internal writers rather than multiple internal > output. > > Besides this, I’m also curious whether Flink can achieve this like Spark: > allow user to specify partition number in partitionBy() method (so no > multiple output formats are needed). But this seems to need non-trivial > changes in Flink core. > > Thanks, > Qi > >> On Mar 15, 2019, at 2:36 AM, Ken Krugler <kkrugler_li...@transpac.com >> <mailto:kkrugler_li...@transpac.com>> wrote: >> >> Hi Qi, >> >> See https://github.com/ScaleUnlimited/flink-utils/ >> <https://github.com/ScaleUnlimited/flink-utils/>, for a rough but working >> version of a bucketing sink. >> >> — Ken >> >> >>> On Mar 13, 2019, at 7:46 PM, qi luo <luoqi...@gmail.com >>> <mailto:luoqi...@gmail.com>> wrote: >>> >>> Hi Ken, >>> >>> Agree. I will try partitonBy() to reducer the number of parallel sinks, and >>> may also try sortPartition() so each sink could write files one by one. >>> Looking forward to your solution. :) >>> >>> Thanks, >>> Qi >>> >>>> On Mar 14, 2019, at 2:54 AM, Ken Krugler <kkrugler_li...@transpac.com >>>> <mailto:kkrugler_li...@transpac.com>> wrote: >>>> >>>> Hi Qi, >>>> >>>>> On Mar 13, 2019, at 1:26 AM, qi luo <luoqi...@gmail.com >>>>> <mailto:luoqi...@gmail.com>> wrote: >>>>> >>>>> Hi Ken, >>>>> >>>>> Do you mean that I can create a batch sink which writes to N files? >>>> >>>> Correct. >>>> >>>>> That sounds viable, but since our data size is huge (billions of records >>>>> & thousands of files), the performance may be unacceptable. >>>> >>>> The main issue with performance (actually memory usage) is how many >>>> OutputFormats do you need to have open at the same time. >>>> >>>> If you partition by the same key that’s used to define buckets, then the >>>> max number is less, as each parallel instance of the sink only gets a >>>> unique subset of all possible bucket values. >>>> >>>> I’m actually dealing with something similar now, so I might have a >>>> solution to share soon. >>>> >>>> — Ken >>>> >>>> >>>>> I will check Blink and give it a try anyway. >>>>> >>>>> Thank you, >>>>> Qi >>>>> >>>>>> On Mar 12, 2019, at 11:58 PM, Ken Krugler <kkrugler_li...@transpac.com >>>>>> <mailto:kkrugler_li...@transpac.com>> wrote: >>>>>> >>>>>> Hi Qi, >>>>>> >>>>>> If I understand what you’re trying to do, then this sounds like a >>>>>> variation of a bucketing sink. >>>>>> >>>>>> That typically uses a field value to create a directory path or a file >>>>>> name (though the filename case is only viable when the field is also >>>>>> what’s used to partition the data) >>>>>> >>>>>> But I don’t believe Flink has built-in support for that, in batch mode >>>>>> (see BucketingSink >>>>>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html> >>>>>> for streaming). >>>>>> >>>>>> Maybe Blink has added that? Hoping someone who knows that codebase can >>>>>> chime in here. >>>>>> >>>>>> Otherwise you’ll need to create a custom sink to implement the desired >>>>>> behavior - though abusing a MapPartitionFunction >>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html> >>>>>> would be easiest, I think. >>>>>> >>>>>> — Ken >>>>>> >>>>>> >>>>>> >>>>>>> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com >>>>>>> <mailto:luoqi...@gmail.com>> wrote: >>>>>>> >>>>>>> Hi Ken, >>>>>>> >>>>>>> Thanks for your reply. I may not make myself clear: our problem is not >>>>>>> about reading but rather writing. >>>>>>> >>>>>>> We need to write to N files based on key partitioning. We have to use >>>>>>> setParallelism() to set the output partition/file number, but when the >>>>>>> partition number is too large (~100K), the parallelism would be too >>>>>>> high. Is there any other way to achieve this? >>>>>>> >>>>>>> Thanks, >>>>>>> Qi >>>>>>> >>>>>>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com >>>>>>>> <mailto:kkrugler_li...@transpac.com>> wrote: >>>>>>>> >>>>>>>> Hi Qi, >>>>>>>> >>>>>>>> I’m guessing you’re calling createInput() for each input file. >>>>>>>> >>>>>>>> If so, then instead you want to do something like: >>>>>>>> >>>>>>>> Job job = Job.getInstance(); >>>>>>>> >>>>>>>> for each file… >>>>>>>> FileInputFormat.addInputPath(job, new >>>>>>>> org.apache.hadoop.fs.Path(file path)); >>>>>>>> >>>>>>>> env.createInput(HadoopInputs.createHadoopInput(…, job) >>>>>>>> >>>>>>>> Flink/Hadoop will take care of parallelizing the reads from the files, >>>>>>>> given the parallelism that you’re specifying. >>>>>>>> >>>>>>>> — Ken >>>>>>>> >>>>>>>> >>>>>>>>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com >>>>>>>>> <mailto:luoqi...@gmail.com>> wrote: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> We’re trying to distribute batch input data to (N) HDFS files >>>>>>>>> partitioning by hash using DataSet API. What I’m doing is like: >>>>>>>>> >>>>>>>>> env.createInput(…) >>>>>>>>> .partitionByHash(0) >>>>>>>>> .setParallelism(N) >>>>>>>>> .output(…) >>>>>>>>> >>>>>>>>> This works well for small number of files. But when we need to >>>>>>>>> distribute to large number of files (say 100K), the parallelism >>>>>>>>> becomes too large and we could not afford that many TMs. >>>>>>>>> >>>>>>>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control >>>>>>>>> the parallelism separately (using dynamic allocation). Is there >>>>>>>>> anything similar in Flink or other way we can achieve similar result? >>>>>>>>> Thank you! >>>>>>>>> >>>>>>>>> Qi >>>>>>>> >>>>>>>> -------------------------- >>>>>>>> Ken Krugler >>>>>>>> +1 530-210-6378 >>>>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>>>>>>> Custom big data solutions & training >>>>>>>> Flink, Solr, Hadoop, Cascading & Cassandra >>>>>>>> >>>>>>> >>>>>> >>>>>> -------------------------- >>>>>> Ken Krugler >>>>>> +1 530-210-6378 >>>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>>>>> Custom big data solutions & training >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra >>>>>> >>>>> >>>> >>>> -------------------------- >>>> Ken Krugler >>>> +1 530-210-6378 >>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>>> Custom big data solutions & training >>>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> >