Hi,

I'm sorry but I'm only familiar with the high-level design but not with the
implementation details and concrete roadmap for the involved components.
I think that FLINK-10288 [1] and FLINK-10429 [2] are related to partition
handling.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-10288
[2] https://issues.apache.org/jira/browse/FLINK-10429


Am Fr., 15. März 2019 um 12:13 Uhr schrieb qi luo <luoqi...@gmail.com>:

> Hi Fabian,
>
> I understand this is a by-design behavior, since Flink is firstly built
> for streaming. Supporting batch shuffle and custom partition number in
> Flink may be compelling in batch processing.
>
> Could you help explain a bit more on which works are needed to be done, so
> Flink can support custom partition numbers numbers? We would be willing to
> help improve this area.
>
> Thanks,
> Qi
>
> On Mar 15, 2019, at 4:25 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi,
>
> Flink works a bit differently than Spark.
> By default, Flink uses pipelined shuffles which push results of the sender
> immediately to the receivers (btw. this is one of the building blocks for
> stream processing).
> However, pipelined shuffles require that all receivers are online. Hence,
> there number of partitions determines the number of running tasks.
> There is also a batch shuffle mode, but it needs to be explicitly enabled
> and AFAIK does not resolve the dependency of number of partitions and task
> parallelism.
>
> However, the community is currently working on many improvements for batch
> processing, including scheduling and fault-tolerance.
> Batched shuffles are an important building block for this and there might
> be better support for your use case in the future.
>
> Best, Fabian
>
> Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo <luoqi...@gmail.com>:
>
>> Hi Ken,
>>
>> That looks awesome! I’ve implemented something similar to your bucketing
>> sink, but using multiple internal writers rather than multiple internal
>> output.
>>
>> Besides this, I’m also curious whether Flink can achieve this like Spark:
>> allow user to specify partition number in partitionBy() method (so no
>> multiple output formats are needed). But this seems to need non-trivial
>> changes in Flink core.
>>
>> Thanks,
>> Qi
>>
>> On Mar 15, 2019, at 2:36 AM, Ken Krugler <kkrugler_li...@transpac.com>
>> wrote:
>>
>> Hi Qi,
>>
>> See https://github.com/ScaleUnlimited/flink-utils/, for a rough but
>> working version of a bucketing sink.
>>
>> — Ken
>>
>>
>> On Mar 13, 2019, at 7:46 PM, qi luo <luoqi...@gmail.com> wrote:
>>
>> Hi Ken,
>>
>> Agree. I will try partitonBy() to reducer the number of parallel sinks,
>> and may also try sortPartition() so each sink could write files one by one.
>> Looking forward to your solution. :)
>>
>> Thanks,
>> Qi
>>
>> On Mar 14, 2019, at 2:54 AM, Ken Krugler <kkrugler_li...@transpac.com>
>> wrote:
>>
>> Hi Qi,
>>
>> On Mar 13, 2019, at 1:26 AM, qi luo <luoqi...@gmail.com> wrote:
>>
>> Hi Ken,
>>
>> Do you mean that I can create a batch sink which writes to N files?
>>
>>
>> Correct.
>>
>> That sounds viable, but since our data size is huge (billions of records
>> & thousands of files), the performance may be unacceptable.
>>
>>
>> The main issue with performance (actually memory usage) is how many
>> OutputFormats do you need to have open at the same time.
>>
>> If you partition by the same key that’s used to define buckets, then the
>> max number is less, as each parallel instance of the sink only gets a
>> unique subset of all possible bucket values.
>>
>> I’m actually dealing with something similar now, so I might have a
>> solution to share soon.
>>
>> — Ken
>>
>>
>> I will check Blink and give it a try anyway.
>>
>> Thank you,
>> Qi
>>
>> On Mar 12, 2019, at 11:58 PM, Ken Krugler <kkrugler_li...@transpac.com>
>> wrote:
>>
>> Hi Qi,
>>
>> If I understand what you’re trying to do, then this sounds like a
>> variation of a bucketing sink.
>>
>> That typically uses a field value to create a directory path or a file
>> name (though the filename case is only viable when the field is also what’s
>> used to partition the data)
>>
>> But I don’t believe Flink has built-in support for that, in batch mode
>> (see BucketingSink
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
>>  for
>> streaming).
>>
>> Maybe Blink has added that? Hoping someone who knows that codebase can
>> chime in here.
>>
>> Otherwise you’ll need to create a custom sink to implement the desired
>> behavior - though abusing a MapPartitionFunction
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/api/common/functions/MapPartitionFunction.html>
>>  would
>> be easiest, I think.
>>
>> — Ken
>>
>>
>>
>> On Mar 12, 2019, at 2:28 AM, qi luo <luoqi...@gmail.com> wrote:
>>
>> Hi Ken,
>>
>> Thanks for your reply. I may not make myself clear: our problem is not
>> about reading but rather writing.
>>
>> We need to write to N files based on key partitioning. We have to use
>> *setParallelism() *to set the output partition/file number, but when the
>> partition number is too large (~100K), the parallelism would be too high.
>> Is there any other way to achieve this?
>>
>> Thanks,
>> Qi
>>
>> On Mar 11, 2019, at 11:22 PM, Ken Krugler <kkrugler_li...@transpac.com>
>> wrote:
>>
>> Hi Qi,
>>
>> I’m guessing you’re calling createInput() for each input file.
>>
>> If so, then instead you want to do something like:
>>
>>      Job job = Job.getInstance();
>>
>> for each file…
>> FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(file
>> path));
>>
>> env.createInput(HadoopInputs.createHadoopInput(…, job)
>>
>> Flink/Hadoop will take care of parallelizing the reads from the files,
>> given the parallelism that you’re specifying.
>>
>> — Ken
>>
>>
>> On Mar 11, 2019, at 5:42 AM, qi luo <luoqi...@gmail.com> wrote:
>>
>> Hi,
>>
>> We’re trying to distribute batch input data to (N) HDFS files
>> partitioning by hash using DataSet API. What I’m doing is like:
>>
>> *env.createInput(…)*
>> *      .partitionByHash(0)*
>> *      .setParallelism(N)*
>> *      .output(…)*
>>
>> This works well for small number of files. But when we need to distribute
>> to* large number of files (say 100K)*, the parallelism becomes too large
>> and we could not afford that many TMs.
>>
>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the
>> parallelism separately (using dynamic allocation). Is there anything
>> similar in Flink or other way we can achieve similar result? Thank you!
>>
>> Qi
>>
>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>
>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>
>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>
>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>
>>
>

Reply via email to