Re: Set partition number of Flink DataSet

2019-03-21 Thread qi luo
Thank you Fabian! I will check these issues.

> On Mar 20, 2019, at 4:25 PM, Fabian Hueske  wrote:
> 
> Hi,
> 
> I'm sorry but I'm only familiar with the high-level design but not with the 
> implementation details and concrete roadmap for the involved components.
> I think that FLINK-10288 [1] and FLINK-10429 [2] are related to partition 
> handling.
> 
> Best,
> Fabian
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10288 
> 
> [2] https://issues.apache.org/jira/browse/FLINK-10429 
> 
> 
> 
> Am Fr., 15. März 2019 um 12:13 Uhr schrieb qi luo  >:
> Hi Fabian,
> 
> I understand this is a by-design behavior, since Flink is firstly built for 
> streaming. Supporting batch shuffle and custom partition number in Flink may 
> be compelling in batch processing. 
> 
> Could you help explain a bit more on which works are needed to be done, so 
> Flink can support custom partition numbers numbers? We would be willing to 
> help improve this area.
> 
> Thanks,
> Qi
> 
>> On Mar 15, 2019, at 4:25 PM, Fabian Hueske > > wrote:
>> 
>> Hi,
>> 
>> Flink works a bit differently than Spark.
>> By default, Flink uses pipelined shuffles which push results of the sender 
>> immediately to the receivers (btw. this is one of the building blocks for 
>> stream processing).
>> However, pipelined shuffles require that all receivers are online. Hence, 
>> there number of partitions determines the number of running tasks.
>> There is also a batch shuffle mode, but it needs to be explicitly enabled 
>> and AFAIK does not resolve the dependency of number of partitions and task 
>> parallelism.
>> 
>> However, the community is currently working on many improvements for batch 
>> processing, including scheduling and fault-tolerance. 
>> Batched shuffles are an important building block for this and there might be 
>> better support for your use case in the future.
>> 
>> Best, Fabian
>> 
>> Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo > >:
>> Hi Ken,
>> 
>> That looks awesome! I’ve implemented something similar to your bucketing 
>> sink, but using multiple internal writers rather than multiple internal 
>> output.
>> 
>> Besides this, I’m also curious whether Flink can achieve this like Spark: 
>> allow user to specify partition number in partitionBy() method (so no 
>> multiple output formats are needed). But this seems to need non-trivial 
>> changes in Flink core.
>> 
>> Thanks,
>> Qi
>> 
>>> On Mar 15, 2019, at 2:36 AM, Ken Krugler >> > wrote:
>>> 
>>> Hi Qi,
>>> 
>>> See https://github.com/ScaleUnlimited/flink-utils/ 
>>> , for a rough but working 
>>> version of a bucketing sink.
>>> 
>>> — Ken
>>> 
>>> 
 On Mar 13, 2019, at 7:46 PM, qi luo >>> > wrote:
 
 Hi Ken,
 
 Agree. I will try partitonBy() to reducer the number of parallel sinks, 
 and may also try sortPartition() so each sink could write files one by 
 one. Looking forward to your solution. :)
 
 Thanks,
 Qi
 
> On Mar 14, 2019, at 2:54 AM, Ken Krugler  > wrote:
> 
> Hi Qi,
> 
>> On Mar 13, 2019, at 1:26 AM, qi luo > > wrote:
>> 
>> Hi Ken,
>> 
>> Do you mean that I can create a batch sink which writes to N files? 
> 
> Correct.
> 
>> That sounds viable, but since our data size is huge (billions of records 
>> & thousands of files), the performance may be unacceptable. 
> 
> The main issue with performance (actually memory usage) is how many 
> OutputFormats do you need to have open at the same time.
> 
> If you partition by the same key that’s used to define buckets, then the 
> max number is less, as each parallel instance of the sink only gets a 
> unique subset of all possible bucket values.
> 
> I’m actually dealing with something similar now, so I might have a 
> solution to share soon.
> 
> — Ken
> 
> 
>> I will check Blink and give it a try anyway.
>> 
>> Thank you,
>> Qi
>> 
>>> On Mar 12, 2019, at 11:58 PM, Ken Krugler >> > wrote:
>>> 
>>> Hi Qi,
>>> 
>>> If I understand what you’re trying to do, then this sounds like a 
>>> variation of a bucketing sink.
>>> 
>>> That typically uses a field value to create a directory path or a file 
>>> name (though the filename case is only viable when the field is also 
>>> what’s used to partition the data)
>>> 
>>> But I don’t believe Flink has built-in support for that, in batch mode 
>>> (see BucketingSink 
>>> 

Re: Set partition number of Flink DataSet

2019-03-20 Thread Fabian Hueske
Hi,

I'm sorry but I'm only familiar with the high-level design but not with the
implementation details and concrete roadmap for the involved components.
I think that FLINK-10288 [1] and FLINK-10429 [2] are related to partition
handling.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-10288
[2] https://issues.apache.org/jira/browse/FLINK-10429


Am Fr., 15. März 2019 um 12:13 Uhr schrieb qi luo :

> Hi Fabian,
>
> I understand this is a by-design behavior, since Flink is firstly built
> for streaming. Supporting batch shuffle and custom partition number in
> Flink may be compelling in batch processing.
>
> Could you help explain a bit more on which works are needed to be done, so
> Flink can support custom partition numbers numbers? We would be willing to
> help improve this area.
>
> Thanks,
> Qi
>
> On Mar 15, 2019, at 4:25 PM, Fabian Hueske  wrote:
>
> Hi,
>
> Flink works a bit differently than Spark.
> By default, Flink uses pipelined shuffles which push results of the sender
> immediately to the receivers (btw. this is one of the building blocks for
> stream processing).
> However, pipelined shuffles require that all receivers are online. Hence,
> there number of partitions determines the number of running tasks.
> There is also a batch shuffle mode, but it needs to be explicitly enabled
> and AFAIK does not resolve the dependency of number of partitions and task
> parallelism.
>
> However, the community is currently working on many improvements for batch
> processing, including scheduling and fault-tolerance.
> Batched shuffles are an important building block for this and there might
> be better support for your use case in the future.
>
> Best, Fabian
>
> Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo :
>
>> Hi Ken,
>>
>> That looks awesome! I’ve implemented something similar to your bucketing
>> sink, but using multiple internal writers rather than multiple internal
>> output.
>>
>> Besides this, I’m also curious whether Flink can achieve this like Spark:
>> allow user to specify partition number in partitionBy() method (so no
>> multiple output formats are needed). But this seems to need non-trivial
>> changes in Flink core.
>>
>> Thanks,
>> Qi
>>
>> On Mar 15, 2019, at 2:36 AM, Ken Krugler 
>> wrote:
>>
>> Hi Qi,
>>
>> See https://github.com/ScaleUnlimited/flink-utils/, for a rough but
>> working version of a bucketing sink.
>>
>> — Ken
>>
>>
>> On Mar 13, 2019, at 7:46 PM, qi luo  wrote:
>>
>> Hi Ken,
>>
>> Agree. I will try partitonBy() to reducer the number of parallel sinks,
>> and may also try sortPartition() so each sink could write files one by one.
>> Looking forward to your solution. :)
>>
>> Thanks,
>> Qi
>>
>> On Mar 14, 2019, at 2:54 AM, Ken Krugler 
>> wrote:
>>
>> Hi Qi,
>>
>> On Mar 13, 2019, at 1:26 AM, qi luo  wrote:
>>
>> Hi Ken,
>>
>> Do you mean that I can create a batch sink which writes to N files?
>>
>>
>> Correct.
>>
>> That sounds viable, but since our data size is huge (billions of records
>> & thousands of files), the performance may be unacceptable.
>>
>>
>> The main issue with performance (actually memory usage) is how many
>> OutputFormats do you need to have open at the same time.
>>
>> If you partition by the same key that’s used to define buckets, then the
>> max number is less, as each parallel instance of the sink only gets a
>> unique subset of all possible bucket values.
>>
>> I’m actually dealing with something similar now, so I might have a
>> solution to share soon.
>>
>> — Ken
>>
>>
>> I will check Blink and give it a try anyway.
>>
>> Thank you,
>> Qi
>>
>> On Mar 12, 2019, at 11:58 PM, Ken Krugler 
>> wrote:
>>
>> Hi Qi,
>>
>> If I understand what you’re trying to do, then this sounds like a
>> variation of a bucketing sink.
>>
>> That typically uses a field value to create a directory path or a file
>> name (though the filename case is only viable when the field is also what’s
>> used to partition the data)
>>
>> But I don’t believe Flink has built-in support for that, in batch mode
>> (see BucketingSink
>> 
>>  for
>> streaming).
>>
>> Maybe Blink has added that? Hoping someone who knows that codebase can
>> chime in here.
>>
>> Otherwise you’ll need to create a custom sink to implement the desired
>> behavior - though abusing a MapPartitionFunction
>> 
>>  would
>> be easiest, I think.
>>
>> — Ken
>>
>>
>>
>> On Mar 12, 2019, at 2:28 AM, qi luo  wrote:
>>
>> Hi Ken,
>>
>> Thanks for your reply. I may not make myself clear: our problem is not
>> about reading but rather writing.
>>
>> We need to write to N files based on key partitioning. We have to use
>> *setParallelism() *to set the output partition/file number, but when the
>> partition number is too large (~100K), 

Re: Set partition number of Flink DataSet

2019-03-15 Thread qi luo
Hi Fabian,

I understand this is a by-design behavior, since Flink is firstly built for 
streaming. Supporting batch shuffle and custom partition number in Flink may be 
compelling in batch processing. 

Could you help explain a bit more on which works are needed to be done, so 
Flink can support custom partition numbers numbers? We would be willing to help 
improve this area.

Thanks,
Qi

> On Mar 15, 2019, at 4:25 PM, Fabian Hueske  wrote:
> 
> Hi,
> 
> Flink works a bit differently than Spark.
> By default, Flink uses pipelined shuffles which push results of the sender 
> immediately to the receivers (btw. this is one of the building blocks for 
> stream processing).
> However, pipelined shuffles require that all receivers are online. Hence, 
> there number of partitions determines the number of running tasks.
> There is also a batch shuffle mode, but it needs to be explicitly enabled and 
> AFAIK does not resolve the dependency of number of partitions and task 
> parallelism.
> 
> However, the community is currently working on many improvements for batch 
> processing, including scheduling and fault-tolerance. 
> Batched shuffles are an important building block for this and there might be 
> better support for your use case in the future.
> 
> Best, Fabian
> 
> Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo  >:
> Hi Ken,
> 
> That looks awesome! I’ve implemented something similar to your bucketing 
> sink, but using multiple internal writers rather than multiple internal 
> output.
> 
> Besides this, I’m also curious whether Flink can achieve this like Spark: 
> allow user to specify partition number in partitionBy() method (so no 
> multiple output formats are needed). But this seems to need non-trivial 
> changes in Flink core.
> 
> Thanks,
> Qi
> 
>> On Mar 15, 2019, at 2:36 AM, Ken Krugler > > wrote:
>> 
>> Hi Qi,
>> 
>> See https://github.com/ScaleUnlimited/flink-utils/ 
>> , for a rough but working 
>> version of a bucketing sink.
>> 
>> — Ken
>> 
>> 
>>> On Mar 13, 2019, at 7:46 PM, qi luo >> > wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Agree. I will try partitonBy() to reducer the number of parallel sinks, and 
>>> may also try sortPartition() so each sink could write files one by one. 
>>> Looking forward to your solution. :)
>>> 
>>> Thanks,
>>> Qi
>>> 
 On Mar 14, 2019, at 2:54 AM, Ken Krugler >>> > wrote:
 
 Hi Qi,
 
> On Mar 13, 2019, at 1:26 AM, qi luo  > wrote:
> 
> Hi Ken,
> 
> Do you mean that I can create a batch sink which writes to N files? 
 
 Correct.
 
> That sounds viable, but since our data size is huge (billions of records 
> & thousands of files), the performance may be unacceptable. 
 
 The main issue with performance (actually memory usage) is how many 
 OutputFormats do you need to have open at the same time.
 
 If you partition by the same key that’s used to define buckets, then the 
 max number is less, as each parallel instance of the sink only gets a 
 unique subset of all possible bucket values.
 
 I’m actually dealing with something similar now, so I might have a 
 solution to share soon.
 
 — Ken
 
 
> I will check Blink and give it a try anyway.
> 
> Thank you,
> Qi
> 
>> On Mar 12, 2019, at 11:58 PM, Ken Krugler > > wrote:
>> 
>> Hi Qi,
>> 
>> If I understand what you’re trying to do, then this sounds like a 
>> variation of a bucketing sink.
>> 
>> That typically uses a field value to create a directory path or a file 
>> name (though the filename case is only viable when the field is also 
>> what’s used to partition the data)
>> 
>> But I don’t believe Flink has built-in support for that, in batch mode 
>> (see BucketingSink 
>> 
>>  for streaming).
>> 
>> Maybe Blink has added that? Hoping someone who knows that codebase can 
>> chime in here.
>> 
>> Otherwise you’ll need to create a custom sink to implement the desired 
>> behavior - though abusing a MapPartitionFunction 
>> 
>>  would be easiest, I think.
>> 
>> — Ken
>> 
>> 
>> 
>>> On Mar 12, 2019, at 2:28 AM, qi luo >> > wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Thanks for your reply. I may not make myself clear: our problem is not 
>>> about reading but rather writing. 
>>> 
>>> We need to write to N files based 

Re: Set partition number of Flink DataSet

2019-03-15 Thread Fabian Hueske
Hi,

Flink works a bit differently than Spark.
By default, Flink uses pipelined shuffles which push results of the sender
immediately to the receivers (btw. this is one of the building blocks for
stream processing).
However, pipelined shuffles require that all receivers are online. Hence,
there number of partitions determines the number of running tasks.
There is also a batch shuffle mode, but it needs to be explicitly enabled
and AFAIK does not resolve the dependency of number of partitions and task
parallelism.

However, the community is currently working on many improvements for batch
processing, including scheduling and fault-tolerance.
Batched shuffles are an important building block for this and there might
be better support for your use case in the future.

Best, Fabian

Am Fr., 15. März 2019 um 03:56 Uhr schrieb qi luo :

> Hi Ken,
>
> That looks awesome! I’ve implemented something similar to your bucketing
> sink, but using multiple internal writers rather than multiple internal
> output.
>
> Besides this, I’m also curious whether Flink can achieve this like Spark:
> allow user to specify partition number in partitionBy() method (so no
> multiple output formats are needed). But this seems to need non-trivial
> changes in Flink core.
>
> Thanks,
> Qi
>
> On Mar 15, 2019, at 2:36 AM, Ken Krugler 
> wrote:
>
> Hi Qi,
>
> See https://github.com/ScaleUnlimited/flink-utils/, for a rough but
> working version of a bucketing sink.
>
> — Ken
>
>
> On Mar 13, 2019, at 7:46 PM, qi luo  wrote:
>
> Hi Ken,
>
> Agree. I will try partitonBy() to reducer the number of parallel sinks,
> and may also try sortPartition() so each sink could write files one by one.
> Looking forward to your solution. :)
>
> Thanks,
> Qi
>
> On Mar 14, 2019, at 2:54 AM, Ken Krugler 
> wrote:
>
> Hi Qi,
>
> On Mar 13, 2019, at 1:26 AM, qi luo  wrote:
>
> Hi Ken,
>
> Do you mean that I can create a batch sink which writes to N files?
>
>
> Correct.
>
> That sounds viable, but since our data size is huge (billions of records &
> thousands of files), the performance may be unacceptable.
>
>
> The main issue with performance (actually memory usage) is how many
> OutputFormats do you need to have open at the same time.
>
> If you partition by the same key that’s used to define buckets, then the
> max number is less, as each parallel instance of the sink only gets a
> unique subset of all possible bucket values.
>
> I’m actually dealing with something similar now, so I might have a
> solution to share soon.
>
> — Ken
>
>
> I will check Blink and give it a try anyway.
>
> Thank you,
> Qi
>
> On Mar 12, 2019, at 11:58 PM, Ken Krugler 
> wrote:
>
> Hi Qi,
>
> If I understand what you’re trying to do, then this sounds like a
> variation of a bucketing sink.
>
> That typically uses a field value to create a directory path or a file
> name (though the filename case is only viable when the field is also what’s
> used to partition the data)
>
> But I don’t believe Flink has built-in support for that, in batch mode
> (see BucketingSink
> 
>  for
> streaming).
>
> Maybe Blink has added that? Hoping someone who knows that codebase can
> chime in here.
>
> Otherwise you’ll need to create a custom sink to implement the desired
> behavior - though abusing a MapPartitionFunction
> 
>  would
> be easiest, I think.
>
> — Ken
>
>
>
> On Mar 12, 2019, at 2:28 AM, qi luo  wrote:
>
> Hi Ken,
>
> Thanks for your reply. I may not make myself clear: our problem is not
> about reading but rather writing.
>
> We need to write to N files based on key partitioning. We have to use
> *setParallelism() *to set the output partition/file number, but when the
> partition number is too large (~100K), the parallelism would be too high.
> Is there any other way to achieve this?
>
> Thanks,
> Qi
>
> On Mar 11, 2019, at 11:22 PM, Ken Krugler 
> wrote:
>
> Hi Qi,
>
> I’m guessing you’re calling createInput() for each input file.
>
> If so, then instead you want to do something like:
>
>  Job job = Job.getInstance();
>
> for each file…
> FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(file
> path));
>
> env.createInput(HadoopInputs.createHadoopInput(…, job)
>
> Flink/Hadoop will take care of parallelizing the reads from the files,
> given the parallelism that you’re specifying.
>
> — Ken
>
>
> On Mar 11, 2019, at 5:42 AM, qi luo  wrote:
>
> Hi,
>
> We’re trying to distribute batch input data to (N) HDFS files partitioning
> by hash using DataSet API. What I’m doing is like:
>
> *env.createInput(…)*
> *  .partitionByHash(0)*
> *  .setParallelism(N)*
> *  .output(…)*
>
> This works well for small number of files. But when we need to distribute
> to* large number of files (say 100K)*, the 

Re: Set partition number of Flink DataSet

2019-03-14 Thread qi luo
Hi Ken,

That looks awesome! I’ve implemented something similar to your bucketing sink, 
but using multiple internal writers rather than multiple internal output.

Besides this, I’m also curious whether Flink can achieve this like Spark: allow 
user to specify partition number in partitionBy() method (so no multiple output 
formats are needed). But this seems to need non-trivial changes in Flink core.

Thanks,
Qi

> On Mar 15, 2019, at 2:36 AM, Ken Krugler  wrote:
> 
> Hi Qi,
> 
> See https://github.com/ScaleUnlimited/flink-utils/ 
> , for a rough but working 
> version of a bucketing sink.
> 
> — Ken
> 
> 
>> On Mar 13, 2019, at 7:46 PM, qi luo > > wrote:
>> 
>> Hi Ken,
>> 
>> Agree. I will try partitonBy() to reducer the number of parallel sinks, and 
>> may also try sortPartition() so each sink could write files one by one. 
>> Looking forward to your solution. :)
>> 
>> Thanks,
>> Qi
>> 
>>> On Mar 14, 2019, at 2:54 AM, Ken Krugler >> > wrote:
>>> 
>>> Hi Qi,
>>> 
 On Mar 13, 2019, at 1:26 AM, qi luo >>> > wrote:
 
 Hi Ken,
 
 Do you mean that I can create a batch sink which writes to N files? 
>>> 
>>> Correct.
>>> 
 That sounds viable, but since our data size is huge (billions of records & 
 thousands of files), the performance may be unacceptable. 
>>> 
>>> The main issue with performance (actually memory usage) is how many 
>>> OutputFormats do you need to have open at the same time.
>>> 
>>> If you partition by the same key that’s used to define buckets, then the 
>>> max number is less, as each parallel instance of the sink only gets a 
>>> unique subset of all possible bucket values.
>>> 
>>> I’m actually dealing with something similar now, so I might have a solution 
>>> to share soon.
>>> 
>>> — Ken
>>> 
>>> 
 I will check Blink and give it a try anyway.
 
 Thank you,
 Qi
 
> On Mar 12, 2019, at 11:58 PM, Ken Krugler  > wrote:
> 
> Hi Qi,
> 
> If I understand what you’re trying to do, then this sounds like a 
> variation of a bucketing sink.
> 
> That typically uses a field value to create a directory path or a file 
> name (though the filename case is only viable when the field is also 
> what’s used to partition the data)
> 
> But I don’t believe Flink has built-in support for that, in batch mode 
> (see BucketingSink 
> 
>  for streaming).
> 
> Maybe Blink has added that? Hoping someone who knows that codebase can 
> chime in here.
> 
> Otherwise you’ll need to create a custom sink to implement the desired 
> behavior - though abusing a MapPartitionFunction 
> 
>  would be easiest, I think.
> 
> — Ken
> 
> 
> 
>> On Mar 12, 2019, at 2:28 AM, qi luo > > wrote:
>> 
>> Hi Ken,
>> 
>> Thanks for your reply. I may not make myself clear: our problem is not 
>> about reading but rather writing. 
>> 
>> We need to write to N files based on key partitioning. We have to use 
>> setParallelism() to set the output partition/file number, but when the 
>> partition number is too large (~100K), the parallelism would be too 
>> high. Is there any other way to achieve this?
>> 
>> Thanks,
>> Qi
>> 
>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler >> > wrote:
>>> 
>>> Hi Qi,
>>> 
>>> I’m guessing you’re calling createInput() for each input file.
>>> 
>>> If so, then instead you want to do something like:
>>> 
>>> Job job = Job.getInstance();
>>> 
>>> for each file…
>>> FileInputFormat.addInputPath(job, new 
>>> org.apache.hadoop.fs.Path(file path));
>>> 
>>> env.createInput(HadoopInputs.createHadoopInput(…, job)
>>> 
>>> Flink/Hadoop will take care of parallelizing the reads from the files, 
>>> given the parallelism that you’re specifying.
>>> 
>>> — Ken
>>> 
>>> 
 On Mar 11, 2019, at 5:42 AM, qi luo >>> > wrote:
 
 Hi,
 
 We’re trying to distribute batch input data to (N) HDFS files 
 partitioning by hash using DataSet API. What I’m doing is like:
 
 env.createInput(…)
   .partitionByHash(0)
   .setParallelism(N)
   .output(…)
 
 This works well for small number of files. But when we need to 
 distribute to 

Re: Set partition number of Flink DataSet

2019-03-14 Thread Ken Krugler
Hi Qi,

See https://github.com/ScaleUnlimited/flink-utils/ 
, for a rough but working 
version of a bucketing sink.

— Ken


> On Mar 13, 2019, at 7:46 PM, qi luo  wrote:
> 
> Hi Ken,
> 
> Agree. I will try partitonBy() to reducer the number of parallel sinks, and 
> may also try sortPartition() so each sink could write files one by one. 
> Looking forward to your solution. :)
> 
> Thanks,
> Qi
> 
>> On Mar 14, 2019, at 2:54 AM, Ken Krugler > > wrote:
>> 
>> Hi Qi,
>> 
>>> On Mar 13, 2019, at 1:26 AM, qi luo >> > wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Do you mean that I can create a batch sink which writes to N files? 
>> 
>> Correct.
>> 
>>> That sounds viable, but since our data size is huge (billions of records & 
>>> thousands of files), the performance may be unacceptable. 
>> 
>> The main issue with performance (actually memory usage) is how many 
>> OutputFormats do you need to have open at the same time.
>> 
>> If you partition by the same key that’s used to define buckets, then the max 
>> number is less, as each parallel instance of the sink only gets a unique 
>> subset of all possible bucket values.
>> 
>> I’m actually dealing with something similar now, so I might have a solution 
>> to share soon.
>> 
>> — Ken
>> 
>> 
>>> I will check Blink and give it a try anyway.
>>> 
>>> Thank you,
>>> Qi
>>> 
 On Mar 12, 2019, at 11:58 PM, Ken Krugler >>> > wrote:
 
 Hi Qi,
 
 If I understand what you’re trying to do, then this sounds like a 
 variation of a bucketing sink.
 
 That typically uses a field value to create a directory path or a file 
 name (though the filename case is only viable when the field is also 
 what’s used to partition the data)
 
 But I don’t believe Flink has built-in support for that, in batch mode 
 (see BucketingSink 
 
  for streaming).
 
 Maybe Blink has added that? Hoping someone who knows that codebase can 
 chime in here.
 
 Otherwise you’ll need to create a custom sink to implement the desired 
 behavior - though abusing a MapPartitionFunction 
 
  would be easiest, I think.
 
 — Ken
 
 
 
> On Mar 12, 2019, at 2:28 AM, qi luo  > wrote:
> 
> Hi Ken,
> 
> Thanks for your reply. I may not make myself clear: our problem is not 
> about reading but rather writing. 
> 
> We need to write to N files based on key partitioning. We have to use 
> setParallelism() to set the output partition/file number, but when the 
> partition number is too large (~100K), the parallelism would be too high. 
> Is there any other way to achieve this?
> 
> Thanks,
> Qi
> 
>> On Mar 11, 2019, at 11:22 PM, Ken Krugler > > wrote:
>> 
>> Hi Qi,
>> 
>> I’m guessing you’re calling createInput() for each input file.
>> 
>> If so, then instead you want to do something like:
>> 
>>  Job job = Job.getInstance();
>> 
>>  for each file…
>>  FileInputFormat.addInputPath(job, new 
>> org.apache.hadoop.fs.Path(file path));
>> 
>>  env.createInput(HadoopInputs.createHadoopInput(…, job)
>> 
>> Flink/Hadoop will take care of parallelizing the reads from the files, 
>> given the parallelism that you’re specifying.
>> 
>> — Ken
>> 
>> 
>>> On Mar 11, 2019, at 5:42 AM, qi luo >> > wrote:
>>> 
>>> Hi,
>>> 
>>> We’re trying to distribute batch input data to (N) HDFS files 
>>> partitioning by hash using DataSet API. What I’m doing is like:
>>> 
>>> env.createInput(…)
>>>   .partitionByHash(0)
>>>   .setParallelism(N)
>>>   .output(…)
>>> 
>>> This works well for small number of files. But when we need to 
>>> distribute to large number of files (say 100K), the parallelism becomes 
>>> too large and we could not afford that many TMs.
>>> 
>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control 
>>> the parallelism separately (using dynamic allocation). Is there 
>>> anything similar in Flink or other way we can achieve similar result? 
>>> Thank you!
>>> 
>>> Qi
>> 
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com 
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 
 
 

Re: Set partition number of Flink DataSet

2019-03-13 Thread qi luo
Hi Ken,

Agree. I will try partitonBy() to reducer the number of parallel sinks, and may 
also try sortPartition() so each sink could write files one by one. Looking 
forward to your solution. :)

Thanks,
Qi

> On Mar 14, 2019, at 2:54 AM, Ken Krugler  wrote:
> 
> Hi Qi,
> 
>> On Mar 13, 2019, at 1:26 AM, qi luo > > wrote:
>> 
>> Hi Ken,
>> 
>> Do you mean that I can create a batch sink which writes to N files? 
> 
> Correct.
> 
>> That sounds viable, but since our data size is huge (billions of records & 
>> thousands of files), the performance may be unacceptable. 
> 
> The main issue with performance (actually memory usage) is how many 
> OutputFormats do you need to have open at the same time.
> 
> If you partition by the same key that’s used to define buckets, then the max 
> number is less, as each parallel instance of the sink only gets a unique 
> subset of all possible bucket values.
> 
> I’m actually dealing with something similar now, so I might have a solution 
> to share soon.
> 
> — Ken
> 
> 
>> I will check Blink and give it a try anyway.
>> 
>> Thank you,
>> Qi
>> 
>>> On Mar 12, 2019, at 11:58 PM, Ken Krugler >> > wrote:
>>> 
>>> Hi Qi,
>>> 
>>> If I understand what you’re trying to do, then this sounds like a variation 
>>> of a bucketing sink.
>>> 
>>> That typically uses a field value to create a directory path or a file name 
>>> (though the filename case is only viable when the field is also what’s used 
>>> to partition the data)
>>> 
>>> But I don’t believe Flink has built-in support for that, in batch mode (see 
>>> BucketingSink 
>>> 
>>>  for streaming).
>>> 
>>> Maybe Blink has added that? Hoping someone who knows that codebase can 
>>> chime in here.
>>> 
>>> Otherwise you’ll need to create a custom sink to implement the desired 
>>> behavior - though abusing a MapPartitionFunction 
>>> 
>>>  would be easiest, I think.
>>> 
>>> — Ken
>>> 
>>> 
>>> 
 On Mar 12, 2019, at 2:28 AM, qi luo >>> > wrote:
 
 Hi Ken,
 
 Thanks for your reply. I may not make myself clear: our problem is not 
 about reading but rather writing. 
 
 We need to write to N files based on key partitioning. We have to use 
 setParallelism() to set the output partition/file number, but when the 
 partition number is too large (~100K), the parallelism would be too high. 
 Is there any other way to achieve this?
 
 Thanks,
 Qi
 
> On Mar 11, 2019, at 11:22 PM, Ken Krugler  > wrote:
> 
> Hi Qi,
> 
> I’m guessing you’re calling createInput() for each input file.
> 
> If so, then instead you want to do something like:
> 
>   Job job = Job.getInstance();
> 
>   for each file…
>   FileInputFormat.addInputPath(job, new 
> org.apache.hadoop.fs.Path(file path));
> 
>   env.createInput(HadoopInputs.createHadoopInput(…, job)
> 
> Flink/Hadoop will take care of parallelizing the reads from the files, 
> given the parallelism that you’re specifying.
> 
> — Ken
> 
> 
>> On Mar 11, 2019, at 5:42 AM, qi luo > > wrote:
>> 
>> Hi,
>> 
>> We’re trying to distribute batch input data to (N) HDFS files 
>> partitioning by hash using DataSet API. What I’m doing is like:
>> 
>> env.createInput(…)
>>   .partitionByHash(0)
>>   .setParallelism(N)
>>   .output(…)
>> 
>> This works well for small number of files. But when we need to 
>> distribute to large number of files (say 100K), the parallelism becomes 
>> too large and we could not afford that many TMs.
>> 
>> In spark we can write something like ‘rdd.partitionBy(N)’ and control 
>> the parallelism separately (using dynamic allocation). Is there anything 
>> similar in Flink or other way we can achieve similar result? Thank you!
>> 
>> Qi
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com 
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 
 
>>> 
>>> --
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com 
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> 
>> 
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com 
> Custom big data solutions & training
> Flink, Solr, Hadoop, 

Re: Set partition number of Flink DataSet

2019-03-13 Thread Ken Krugler
Hi Qi,

> On Mar 13, 2019, at 1:26 AM, qi luo  wrote:
> 
> Hi Ken,
> 
> Do you mean that I can create a batch sink which writes to N files?

Correct.

> That sounds viable, but since our data size is huge (billions of records & 
> thousands of files), the performance may be unacceptable.

The main issue with performance (actually memory usage) is how many 
OutputFormats do you need to have open at the same time.

If you partition by the same key that’s used to define buckets, then the max 
number is less, as each parallel instance of the sink only gets a unique subset 
of all possible bucket values.

I’m actually dealing with something similar now, so I might have a solution to 
share soon.

— Ken


> I will check Blink and give it a try anyway.
> 
> Thank you,
> Qi
> 
>> On Mar 12, 2019, at 11:58 PM, Ken Krugler > > wrote:
>> 
>> Hi Qi,
>> 
>> If I understand what you’re trying to do, then this sounds like a variation 
>> of a bucketing sink.
>> 
>> That typically uses a field value to create a directory path or a file name 
>> (though the filename case is only viable when the field is also what’s used 
>> to partition the data)
>> 
>> But I don’t believe Flink has built-in support for that, in batch mode (see 
>> BucketingSink 
>> 
>>  for streaming).
>> 
>> Maybe Blink has added that? Hoping someone who knows that codebase can chime 
>> in here.
>> 
>> Otherwise you’ll need to create a custom sink to implement the desired 
>> behavior - though abusing a MapPartitionFunction 
>> 
>>  would be easiest, I think.
>> 
>> — Ken
>> 
>> 
>> 
>>> On Mar 12, 2019, at 2:28 AM, qi luo >> > wrote:
>>> 
>>> Hi Ken,
>>> 
>>> Thanks for your reply. I may not make myself clear: our problem is not 
>>> about reading but rather writing. 
>>> 
>>> We need to write to N files based on key partitioning. We have to use 
>>> setParallelism() to set the output partition/file number, but when the 
>>> partition number is too large (~100K), the parallelism would be too high. 
>>> Is there any other way to achieve this?
>>> 
>>> Thanks,
>>> Qi
>>> 
 On Mar 11, 2019, at 11:22 PM, Ken Krugler >>> > wrote:
 
 Hi Qi,
 
 I’m guessing you’re calling createInput() for each input file.
 
 If so, then instead you want to do something like:
 
Job job = Job.getInstance();
 
for each file…
FileInputFormat.addInputPath(job, new 
 org.apache.hadoop.fs.Path(file path));
 
env.createInput(HadoopInputs.createHadoopInput(…, job)
 
 Flink/Hadoop will take care of parallelizing the reads from the files, 
 given the parallelism that you’re specifying.
 
 — Ken
 
 
> On Mar 11, 2019, at 5:42 AM, qi luo  > wrote:
> 
> Hi,
> 
> We’re trying to distribute batch input data to (N) HDFS files 
> partitioning by hash using DataSet API. What I’m doing is like:
> 
> env.createInput(…)
>   .partitionByHash(0)
>   .setParallelism(N)
>   .output(…)
> 
> This works well for small number of files. But when we need to distribute 
> to large number of files (say 100K), the parallelism becomes too large 
> and we could not afford that many TMs.
> 
> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
> parallelism separately (using dynamic allocation). Is there anything 
> similar in Flink or other way we can achieve similar result? Thank you!
> 
> Qi
 
 --
 Ken Krugler
 +1 530-210-6378
 http://www.scaleunlimited.com 
 Custom big data solutions & training
 Flink, Solr, Hadoop, Cascading & Cassandra
 
>>> 
>> 
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com 
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Set partition number of Flink DataSet

2019-03-13 Thread qi luo
Hi Ken,

Do you mean that I can create a batch sink which writes to N files? That sounds 
viable, but since our data size is huge (billions of records & thousands of 
files), the performance may be unacceptable. I will check Blink and give it a 
try anyway.

Thank you,
Qi

> On Mar 12, 2019, at 11:58 PM, Ken Krugler  wrote:
> 
> Hi Qi,
> 
> If I understand what you’re trying to do, then this sounds like a variation 
> of a bucketing sink.
> 
> That typically uses a field value to create a directory path or a file name 
> (though the filename case is only viable when the field is also what’s used 
> to partition the data)
> 
> But I don’t believe Flink has built-in support for that, in batch mode (see 
> BucketingSink 
> 
>  for streaming).
> 
> Maybe Blink has added that? Hoping someone who knows that codebase can chime 
> in here.
> 
> Otherwise you’ll need to create a custom sink to implement the desired 
> behavior - though abusing a MapPartitionFunction 
> 
>  would be easiest, I think.
> 
> — Ken
> 
> 
> 
>> On Mar 12, 2019, at 2:28 AM, qi luo > > wrote:
>> 
>> Hi Ken,
>> 
>> Thanks for your reply. I may not make myself clear: our problem is not about 
>> reading but rather writing. 
>> 
>> We need to write to N files based on key partitioning. We have to use 
>> setParallelism() to set the output partition/file number, but when the 
>> partition number is too large (~100K), the parallelism would be too high. Is 
>> there any other way to achieve this?
>> 
>> Thanks,
>> Qi
>> 
>>> On Mar 11, 2019, at 11:22 PM, Ken Krugler >> > wrote:
>>> 
>>> Hi Qi,
>>> 
>>> I’m guessing you’re calling createInput() for each input file.
>>> 
>>> If so, then instead you want to do something like:
>>> 
>>> Job job = Job.getInstance();
>>> 
>>> for each file…
>>> FileInputFormat.addInputPath(job, new 
>>> org.apache.hadoop.fs.Path(file path));
>>> 
>>> env.createInput(HadoopInputs.createHadoopInput(…, job)
>>> 
>>> Flink/Hadoop will take care of parallelizing the reads from the files, 
>>> given the parallelism that you’re specifying.
>>> 
>>> — Ken
>>> 
>>> 
 On Mar 11, 2019, at 5:42 AM, qi luo >>> > wrote:
 
 Hi,
 
 We’re trying to distribute batch input data to (N) HDFS files partitioning 
 by hash using DataSet API. What I’m doing is like:
 
 env.createInput(…)
   .partitionByHash(0)
   .setParallelism(N)
   .output(…)
 
 This works well for small number of files. But when we need to distribute 
 to large number of files (say 100K), the parallelism becomes too large and 
 we could not afford that many TMs.
 
 In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
 parallelism separately (using dynamic allocation). Is there anything 
 similar in Flink or other way we can achieve similar result? Thank you!
 
 Qi
>>> 
>>> --
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com 
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> 
>> 
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com 
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 



Re: Set partition number of Flink DataSet

2019-03-12 Thread Ken Krugler
Hi Qi,

If I understand what you’re trying to do, then this sounds like a variation of 
a bucketing sink.

That typically uses a field value to create a directory path or a file name 
(though the filename case is only viable when the field is also what’s used to 
partition the data)

But I don’t believe Flink has built-in support for that, in batch mode (see 
BucketingSink 

 for streaming).

Maybe Blink has added that? Hoping someone who knows that codebase can chime in 
here.

Otherwise you’ll need to create a custom sink to implement the desired behavior 
- though abusing a MapPartitionFunction 

 would be easiest, I think.

— Ken



> On Mar 12, 2019, at 2:28 AM, qi luo  wrote:
> 
> Hi Ken,
> 
> Thanks for your reply. I may not make myself clear: our problem is not about 
> reading but rather writing. 
> 
> We need to write to N files based on key partitioning. We have to use 
> setParallelism() to set the output partition/file number, but when the 
> partition number is too large (~100K), the parallelism would be too high. Is 
> there any other way to achieve this?
> 
> Thanks,
> Qi
> 
>> On Mar 11, 2019, at 11:22 PM, Ken Krugler > > wrote:
>> 
>> Hi Qi,
>> 
>> I’m guessing you’re calling createInput() for each input file.
>> 
>> If so, then instead you want to do something like:
>> 
>>  Job job = Job.getInstance();
>> 
>>  for each file…
>>  FileInputFormat.addInputPath(job, new 
>> org.apache.hadoop.fs.Path(file path));
>> 
>>  env.createInput(HadoopInputs.createHadoopInput(…, job)
>> 
>> Flink/Hadoop will take care of parallelizing the reads from the files, given 
>> the parallelism that you’re specifying.
>> 
>> — Ken
>> 
>> 
>>> On Mar 11, 2019, at 5:42 AM, qi luo >> > wrote:
>>> 
>>> Hi,
>>> 
>>> We’re trying to distribute batch input data to (N) HDFS files partitioning 
>>> by hash using DataSet API. What I’m doing is like:
>>> 
>>> env.createInput(…)
>>>   .partitionByHash(0)
>>>   .setParallelism(N)
>>>   .output(…)
>>> 
>>> This works well for small number of files. But when we need to distribute 
>>> to large number of files (say 100K), the parallelism becomes too large and 
>>> we could not afford that many TMs.
>>> 
>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
>>> parallelism separately (using dynamic allocation). Is there anything 
>>> similar in Flink or other way we can achieve similar result? Thank you!
>>> 
>>> Qi
>> 
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com 
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Set partition number of Flink DataSet

2019-03-12 Thread qi luo
Hi Ken,

Thanks for your reply. I may not make myself clear: our problem is not about 
reading but rather writing. 

We need to write to N files based on key partitioning. We have to use 
setParallelism() to set the output partition/file number, but when the 
partition number is too large (~100K), the parallelism would be too high. Is 
there any other way to achieve this?

Thanks,
Qi

> On Mar 11, 2019, at 11:22 PM, Ken Krugler  wrote:
> 
> Hi Qi,
> 
> I’m guessing you’re calling createInput() for each input file.
> 
> If so, then instead you want to do something like:
> 
>   Job job = Job.getInstance();
> 
>   for each file…
>   FileInputFormat.addInputPath(job, new 
> org.apache.hadoop.fs.Path(file path));
> 
>   env.createInput(HadoopInputs.createHadoopInput(…, job)
> 
> Flink/Hadoop will take care of parallelizing the reads from the files, given 
> the parallelism that you’re specifying.
> 
> — Ken
> 
> 
>> On Mar 11, 2019, at 5:42 AM, qi luo > > wrote:
>> 
>> Hi,
>> 
>> We’re trying to distribute batch input data to (N) HDFS files partitioning 
>> by hash using DataSet API. What I’m doing is like:
>> 
>> env.createInput(…)
>>   .partitionByHash(0)
>>   .setParallelism(N)
>>   .output(…)
>> 
>> This works well for small number of files. But when we need to distribute to 
>> large number of files (say 100K), the parallelism becomes too large and we 
>> could not afford that many TMs.
>> 
>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
>> parallelism separately (using dynamic allocation). Is there anything similar 
>> in Flink or other way we can achieve similar result? Thank you!
>> 
>> Qi
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com 
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 



Re: Set partition number of Flink DataSet

2019-03-11 Thread Ken Krugler
Hi Qi,

I’m guessing you’re calling createInput() for each input file.

If so, then instead you want to do something like:

Job job = Job.getInstance();

for each file…
FileInputFormat.addInputPath(job, new 
org.apache.hadoop.fs.Path(file path));

env.createInput(HadoopInputs.createHadoopInput(…, job)

Flink/Hadoop will take care of parallelizing the reads from the files, given 
the parallelism that you’re specifying.

— Ken


> On Mar 11, 2019, at 5:42 AM, qi luo  wrote:
> 
> Hi,
> 
> We’re trying to distribute batch input data to (N) HDFS files partitioning by 
> hash using DataSet API. What I’m doing is like:
> 
> env.createInput(…)
>   .partitionByHash(0)
>   .setParallelism(N)
>   .output(…)
> 
> This works well for small number of files. But when we need to distribute to 
> large number of files (say 100K), the parallelism becomes too large and we 
> could not afford that many TMs.
> 
> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
> parallelism separately (using dynamic allocation). Is there anything similar 
> in Flink or other way we can achieve similar result? Thank you!
> 
> Qi

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Set partition number of Flink DataSet

2019-03-11 Thread qi luo
Hi,

We’re trying to distribute batch input data to (N) HDFS files partitioning by 
hash using DataSet API. What I’m doing is like:

env.createInput(…)
  .partitionByHash(0)
  .setParallelism(N)
  .output(…)

This works well for small number of files. But when we need to distribute to 
large number of files (say 100K), the parallelism becomes too large and we 
could not afford that many TMs.

In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
parallelism separately (using dynamic allocation). Is there anything similar in 
Flink or other way we can achieve similar result? Thank you!

Qi