Hi Ken, Agree. I will try partitonBy() to reducer the number of parallel sinks, and may also try sortPartition() so each sink could write files one by one. Looking forward to your solution. :)
Thanks, Qi > On Mar 14, 2019, at 2:54 AM, Ken Krugler <kkrugler_li...@transpac.com> wrote: > > Hi Qi, > >> On Mar 13, 2019, at 1:26 AM, qi luo <luoqi...@gmail.com >> <mailto:luoqi...@gmail.com>> wrote: >> >> Hi Ken, >> >> Do you mean that I can create a batch sink which writes to N files? > > Correct. > >> That sounds viable, but since our data size is huge (billions of records & >> thousands of files), the performance may be unacceptable. > > The main issue with performance (actually memory usage) is how many > OutputFormats do you need to have open at the same time. > > If you partition by the same key that’s used to define buckets, then the max > number is less, as each parallel instance of the sink only gets a unique > subset of all possible bucket values. > > I’m actually dealing with something similar now, so I might have a solution > to share soon. > > — Ken > > >> I will check Blink and give it a try anyway. >> >> Thank you, >> Qi >> >>> On Mar 12, 2019, at 11:58 PM, Ken Krugler <kkrugler_li...@transpac.com >>> <mailto:kkrugler_li...@transpac.com>> wrote: >>> >>> Hi Qi, >>> >>> If I understand what you’re trying to do, then this sounds like a variation >>> of a bucketing sink. >>> >>> That typically uses a field value to create a directory path or a file name >>> (though the filename case is only viable when the field is also what’s used >>> to partition the data) >>> >>> But I don’t believe Flink has built-in support for that, in batch mode (see >>> BucketingSink >>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html> >>> for streaming). >>> >>> Maybe Blink has added that? Hoping someone who knows that codebase can >>> chime in here. >>> >>> Otherwise you’ll need to create a custom sink to implement the desired >>> behavior - though abusing a MapPartitionFunction >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html> >>> would be easiest, I think. >>> >>> — Ken >>> >>> >>> >>>> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com >>>> <mailto:luoqi...@gmail.com>> wrote: >>>> >>>> Hi Ken, >>>> >>>> Thanks for your reply. I may not make myself clear: our problem is not >>>> about reading but rather writing. >>>> >>>> We need to write to N files based on key partitioning. We have to use >>>> setParallelism() to set the output partition/file number, but when the >>>> partition number is too large (~100K), the parallelism would be too high. >>>> Is there any other way to achieve this? >>>> >>>> Thanks, >>>> Qi >>>> >>>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com >>>>> <mailto:kkrugler_li...@transpac.com>> wrote: >>>>> >>>>> Hi Qi, >>>>> >>>>> I’m guessing you’re calling createInput() for each input file. >>>>> >>>>> If so, then instead you want to do something like: >>>>> >>>>> Job job = Job.getInstance(); >>>>> >>>>> for each file… >>>>> FileInputFormat.addInputPath(job, new >>>>> org.apache.hadoop.fs.Path(file path)); >>>>> >>>>> env.createInput(HadoopInputs.createHadoopInput(…, job) >>>>> >>>>> Flink/Hadoop will take care of parallelizing the reads from the files, >>>>> given the parallelism that you’re specifying. >>>>> >>>>> — Ken >>>>> >>>>> >>>>>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com >>>>>> <mailto:luoqi...@gmail.com>> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> We’re trying to distribute batch input data to (N) HDFS files >>>>>> partitioning by hash using DataSet API. What I’m doing is like: >>>>>> >>>>>> env.createInput(…) >>>>>> .partitionByHash(0) >>>>>> .setParallelism(N) >>>>>> .output(…) >>>>>> >>>>>> This works well for small number of files. But when we need to >>>>>> distribute to large number of files (say 100K), the parallelism becomes >>>>>> too large and we could not afford that many TMs. >>>>>> >>>>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control >>>>>> the parallelism separately (using dynamic allocation). Is there anything >>>>>> similar in Flink or other way we can achieve similar result? Thank you! >>>>>> >>>>>> Qi >>>>> >>>>> -------------------------- >>>>> Ken Krugler >>>>> +1 530-210-6378 >>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>>>> Custom big data solutions & training >>>>> Flink, Solr, Hadoop, Cascading & Cassandra >>>>> >>>> >>> >>> -------------------------- >>> Ken Krugler >>> +1 530-210-6378 >>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>> Custom big data solutions & training >>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >> > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra