Hi, I'm sorry but I'm only familiar with the high-level design but not with the implementation details and concrete roadmap for the involved components. I think that FLINK-10288 [1] and FLINK-10429 [2] are related to partition handling.
Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-10288 [2] https://issues.apache.org/jira/browse/FLINK-10429 Am Fr., 15. März 2019 um 12:13 Uhr schrieb qi luo <luoqi...@gmail.com>: > Hi Fabian, > > I understand this is a by-design behavior, since Flink is firstly built > for streaming. Supporting batch shuffle and custom partition number in > Flink may be compelling in batch processing. > > Could you help explain a bit more on which works are needed to be done, so > Flink can support custom partition numbers numbers? We would be willing to > help improve this area. > > Thanks, > Qi > > On Mar 15, 2019, at 4:25 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi, > > Flink works a bit differently than Spark. > By default, Flink uses pipelined shuffles which push results of the sender > immediately to the receivers (btw. this is one of the building blocks for > stream processing). > However, pipelined shuffles require that all receivers are online. Hence, > there number of partitions determines the number of running tasks. > There is also a batch shuffle mode, but it needs to be explicitly enabled > and AFAIK does not resolve the dependency of number of partitions and task > parallelism. > > However, the community is currently working on many improvements for batch > processing, including scheduling and fault-tolerance. > Batched shuffles are an important building block for this and there might > be better support for your use case in the future. > > Best, Fabian > > Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo <luoqi...@gmail.com>: > >> Hi Ken, >> >> That looks awesome! I’ve implemented something similar to your bucketing >> sink, but using multiple internal writers rather than multiple internal >> output. >> >> Besides this, I’m also curious whether Flink can achieve this like Spark: >> allow user to specify partition number in partitionBy() method (so no >> multiple output formats are needed). But this seems to need non-trivial >> changes in Flink core. >> >> Thanks, >> Qi >> >> On Mar 15, 2019, at 2:36 AM, Ken Krugler <kkrugler_li...@transpac.com> >> wrote: >> >> Hi Qi, >> >> See https://github.com/ScaleUnlimited/flink-utils/, for a rough but >> working version of a bucketing sink. >> >> — Ken >> >> >> On Mar 13, 2019, at 7:46 PM, qi luo <luoqi...@gmail.com> wrote: >> >> Hi Ken, >> >> Agree. I will try partitonBy() to reducer the number of parallel sinks, >> and may also try sortPartition() so each sink could write files one by one. >> Looking forward to your solution. :) >> >> Thanks, >> Qi >> >> On Mar 14, 2019, at 2:54 AM, Ken Krugler <kkrugler_li...@transpac.com> >> wrote: >> >> Hi Qi, >> >> On Mar 13, 2019, at 1:26 AM, qi luo <luoqi...@gmail.com> wrote: >> >> Hi Ken, >> >> Do you mean that I can create a batch sink which writes to N files? >> >> >> Correct. >> >> That sounds viable, but since our data size is huge (billions of records >> & thousands of files), the performance may be unacceptable. >> >> >> The main issue with performance (actually memory usage) is how many >> OutputFormats do you need to have open at the same time. >> >> If you partition by the same key that’s used to define buckets, then the >> max number is less, as each parallel instance of the sink only gets a >> unique subset of all possible bucket values. >> >> I’m actually dealing with something similar now, so I might have a >> solution to share soon. >> >> — Ken >> >> >> I will check Blink and give it a try anyway. >> >> Thank you, >> Qi >> >> On Mar 12, 2019, at 11:58 PM, Ken Krugler <kkrugler_li...@transpac.com> >> wrote: >> >> Hi Qi, >> >> If I understand what you’re trying to do, then this sounds like a >> variation of a bucketing sink. >> >> That typically uses a field value to create a directory path or a file >> name (though the filename case is only viable when the field is also what’s >> used to partition the data) >> >> But I don’t believe Flink has built-in support for that, in batch mode >> (see BucketingSink >> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html> >> for >> streaming). >> >> Maybe Blink has added that? Hoping someone who knows that codebase can >> chime in here. >> >> Otherwise you’ll need to create a custom sink to implement the desired >> behavior - though abusing a MapPartitionFunction >> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html> >> would >> be easiest, I think. >> >> — Ken >> >> >> >> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com> wrote: >> >> Hi Ken, >> >> Thanks for your reply. I may not make myself clear: our problem is not >> about reading but rather writing. >> >> We need to write to N files based on key partitioning. We have to use >> *setParallelism() *to set the output partition/file number, but when the >> partition number is too large (~100K), the parallelism would be too high. >> Is there any other way to achieve this? >> >> Thanks, >> Qi >> >> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com> >> wrote: >> >> Hi Qi, >> >> I’m guessing you’re calling createInput() for each input file. >> >> If so, then instead you want to do something like: >> >> Job job = Job.getInstance(); >> >> for each file… >> FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(file >> path)); >> >> env.createInput(HadoopInputs.createHadoopInput(…, job) >> >> Flink/Hadoop will take care of parallelizing the reads from the files, >> given the parallelism that you’re specifying. >> >> — Ken >> >> >> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com> wrote: >> >> Hi, >> >> We’re trying to distribute batch input data to (N) HDFS files >> partitioning by hash using DataSet API. What I’m doing is like: >> >> *env.createInput(…)* >> * .partitionByHash(0)* >> * .setParallelism(N)* >> * .output(…)* >> >> This works well for small number of files. But when we need to distribute >> to* large number of files (say 100K)*, the parallelism becomes too large >> and we could not afford that many TMs. >> >> In spark we can write something like ‘rdd.partitionBy(N)’ and control the >> parallelism separately (using dynamic allocation). Is there anything >> similar in Flink or other way we can achieve similar result? Thank you! >> >> Qi >> >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> >> >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> >> >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> >> >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> >> >> >