Re: Run a ParDo after a Partition

2017-10-03 Thread Tim
Thanks everyone. Very informative answers and Redistribute / Reshuffle is what 
I missed. 

Might it help adding a table in the docs which summarises the beam 
transformations and their equivalents in Spark, Apex, Flink etc? I'd be happy 
to contribute.

Answering "why a repartition?". Two uses cases I've had in Hadoop and Spark are 
1) inputformats on hadoop that aren't splittable (e.g. Zip files) and 2) 
partitioning for data locality for a fast ingest into e.g HBase (see 
http://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/).
 In this case I'm reading zip files residing on HDFS which is inherently a 
single threaded op but then need to run expensive operations on the resulting 
dataset in parallel.

Thanks,
Tim,
Sent from my iPhone 

> On 3 Oct 2017, at 18:16, Thomas Groh  wrote:
> 
> Just to make sure I'm understanding the question:
> 
> You have some input which cannot be split, and you want to read it all in, 
> redistribute all the records across your available workers, and perform some 
> processing on those records in parallel, yes?
> 
> I think there's some confusion here about the Beam concept of `Partition` and 
> the Spark concept. In Beam, the Partition transform relates to the logical 
> concept of a PCollection, which is unrelated to the physical materialization 
> of that PCollection. In Spark, from my high-level reading, it looks like a 
> partition relates to some physical distribution of the data. Beam doesn't 
> provide direct access to controlling the partitioning of the data - 
> generally, runners should try to distribute the data evenly among all 
> available workers; processing will be performed in parallel across those 
> workers.
> 
> If my understanding of the question is accurate, if you apply  
> `Reshuffle.viaRandomKey()` to your input (`verbatimRecords`). This should 
> redistribute the data arbitrarily across all of your workers/partitions. 
> Downstream processing should then be performed in parallel across your data 
> without the need for any additional explicit configuration.
> 
> 
>> On Tue, Oct 3, 2017 at 8:52 AM, Eugene Kirpichov  
>> wrote:
>> Hmm, partition + flatten is semantically a no-op (it, at most, may or may 
>> not cause the intermediate dataset to be materialized, there are no 
>> guarantees) - why do you need the Partition in the first place?
>> 
>>> On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson  
>>> wrote:
>>> 
>>> Answering my own question.
>>> 
>>> After a partition you "flatten" it as so (in spark this then runs on the 
>>> partitions in parallel tasks) :
>>> PCollection inputDocs = 
>>> partitioned.apply(Flatten.pCollections()).apply(
>>>   "Convert to SOLR", ParDo.of(new ParseToSOLRDoc()));
>>> 
>>> 
>>> 
 On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson  
 wrote:
 Hi folks,
 
 I feel a little daft asking this, and suspect I am missing the obvious...
 
 Can someone please tell me how I can do a ParDo following a Partition?
 
 In spark I'd just repartition(...) and then a map() but I don't spot in 
 the Beam API how to run a ParDo on each partition in parallel.  Do I need 
 to multithread manually?
 
 I tried this:
 PCollectionList partitioned = 
 verbatimRecords.apply(Partition.of(10, new RecordPartitioner()));
 // does not run in parallel on spark...
 for (PCollection untyped : partitioned.getAll()) {
   PCollection inputDocs = 
 partitioned.get(untyped).apply(
 "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc()));
 
   
 inputDocs.apply(SolrIO.write().to("solr-load").withConnectionConfiguration(conn));
 }
 
 [For background: I'm using a non splittable custom Hadoop InputFormat 
 which means I end up with an RDD as a single partition, and need to split 
 it to run expensive op in parallel]
 
 Thanks,
 Tim
 
 
 
 
 
>>> 
> 


Re: Run a ParDo after a Partition

2017-10-03 Thread Thomas Groh
Just to make sure I'm understanding the question:

You have some input which cannot be split, and you want to read it all in,
redistribute all the records across your available workers, and perform
some processing on those records in parallel, yes?

I think there's some confusion here about the Beam concept of `Partition`
and the Spark concept. In Beam, the Partition transform relates to the
*logical* concept of a PCollection, which is unrelated to the
*physical *materialization
of that PCollection. In Spark, from my high-level reading, it looks like a
partition relates to some physical distribution of the data. Beam doesn't
provide direct access to controlling the partitioning of the data -
generally, runners should try to distribute the data evenly among all
available workers; processing will be performed in parallel across those
workers.

If my understanding of the question is accurate, if you apply  `
Reshuffle.viaRandomKey()` to your input (`verbatimRecords`). This should
redistribute the data arbitrarily across all of your workers/partitions.
Downstream processing should then be performed in parallel across your data
without the need for any additional explicit configuration.


On Tue, Oct 3, 2017 at 8:52 AM, Eugene Kirpichov 
wrote:

> Hmm, partition + flatten is semantically a no-op (it, at most, may or may
> not cause the intermediate dataset to be materialized, there are no
> guarantees) - why do you need the Partition in the first place?
>
> On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson 
> wrote:
>
>>
>> Answering my own question.
>>
>> After a partition you "flatten" it as so (in spark this then runs on the
>> partitions in parallel tasks) :
>>
>> PCollection inputDocs = 
>> partitioned.apply(Flatten.pCollections()).apply(
>>   "Convert to SOLR", ParDo.of(new ParseToSOLRDoc()));
>>
>>
>>
>>
>> On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson 
>> wrote:
>>
>>> Hi folks,
>>>
>>> I feel a little daft asking this, and suspect I am missing the obvious...
>>>
>>> Can someone please tell me how I can do a ParDo following a Partition?
>>>
>>> In spark I'd just repartition(...) and then a map() but I don't spot in
>>> the Beam API how to run a ParDo on each partition in parallel.  Do I need
>>> to multithread manually?
>>>
>>> I tried this:
>>>
>>> PCollectionList partitioned = 
>>> verbatimRecords.apply(Partition.of(10, new RecordPartitioner()));
>>>
>>> // does not run in parallel on spark...
>>>
>>> for (PCollection untyped : partitioned.getAll()) {
>>> PCollection inputDocs = partitioned.get(untyped).
>>> apply(
>>> "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc()));
>>>
>>> inputDocs.apply(SolrIO.write().to("solr-load").
>>> withConnectionConfiguration(conn));
>>> }
>>>
>>>
>>> [For background: I'm using a non splittable custom Hadoop InputFormat
>>> which means I end up with an RDD as a single partition, and need to split
>>> it to run expensive op in parallel]
>>>
>>> Thanks,
>>> Tim
>>>
>>>
>>>
>>>
>>>
>>>
>>


Re: Run a ParDo after a Partition

2017-10-03 Thread Robert Bradshaw
Beam's partition concerns itself only with the logical splitting of a
PCollection into a number of subsets, unlike Spark's repartition which
controls the physical distribution of the data. What you probably want
instead is Redistribute which redistributes the work (randomly) across
all available workers. If you care about which values are colocated on
which workers, you could key by the desired locality, do a GroupByKey,
and then drop the key (noting that things would also be grouped by
window in this case).

On Tue, Oct 3, 2017 at 8:52 AM, Eugene Kirpichov  wrote:
> Hmm, partition + flatten is semantically a no-op (it, at most, may or may
> not cause the intermediate dataset to be materialized, there are no
> guarantees) - why do you need the Partition in the first place?
>
> On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson 
> wrote:
>>
>>
>> Answering my own question.
>>
>> After a partition you "flatten" it as so (in spark this then runs on the
>> partitions in parallel tasks) :
>>
>> PCollection inputDocs =
>> partitioned.apply(Flatten.pCollections()).apply(
>>   "Convert to SOLR", ParDo.of(new ParseToSOLRDoc()));
>>
>>
>>
>>
>> On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson 
>> wrote:
>>>
>>> Hi folks,
>>>
>>> I feel a little daft asking this, and suspect I am missing the obvious...
>>>
>>> Can someone please tell me how I can do a ParDo following a Partition?
>>>
>>> In spark I'd just repartition(...) and then a map() but I don't spot in
>>> the Beam API how to run a ParDo on each partition in parallel.  Do I need to
>>> multithread manually?
>>>
>>> I tried this:
>>>
>>> PCollectionList partitioned =
>>> verbatimRecords.apply(Partition.of(10, new RecordPartitioner()));
>>>
>>> // does not run in parallel on spark...
>>>
>>> for (PCollection untyped : partitioned.getAll()) {
>>>   PCollection inputDocs =
>>> partitioned.get(untyped).apply(
>>> "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc()));
>>>
>>>
>>> inputDocs.apply(SolrIO.write().to("solr-load").withConnectionConfiguration(conn));
>>> }
>>>
>>>
>>> [For background: I'm using a non splittable custom Hadoop InputFormat
>>> which means I end up with an RDD as a single partition, and need to split it
>>> to run expensive op in parallel]
>>>
>>> Thanks,
>>> Tim
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Run a ParDo after a Partition

2017-10-03 Thread Eugene Kirpichov
Hmm, partition + flatten is semantically a no-op (it, at most, may or may
not cause the intermediate dataset to be materialized, there are no
guarantees) - why do you need the Partition in the first place?

On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson 
wrote:

>
> Answering my own question.
>
> After a partition you "flatten" it as so (in spark this then runs on the
> partitions in parallel tasks) :
>
> PCollection inputDocs = 
> partitioned.apply(Flatten.pCollections()).apply(
>   "Convert to SOLR", ParDo.of(new ParseToSOLRDoc()));
>
>
>
>
> On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson 
> wrote:
>
>> Hi folks,
>>
>> I feel a little daft asking this, and suspect I am missing the obvious...
>>
>> Can someone please tell me how I can do a ParDo following a Partition?
>>
>> In spark I'd just repartition(...) and then a map() but I don't spot in
>> the Beam API how to run a ParDo on each partition in parallel.  Do I need
>> to multithread manually?
>>
>> I tried this:
>>
>> PCollectionList partitioned = 
>> verbatimRecords.apply(Partition.of(10, new RecordPartitioner()));
>>
>> // does not run in parallel on spark...
>>
>> for (PCollection untyped : partitioned.getAll()) {
>> PCollection inputDocs = partitioned.get(untyped).apply(
>> "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc()));
>>
>> inputDocs.apply(SolrIO.write().to("solr-load"
>> ).withConnectionConfiguration(conn));
>> }
>>
>>
>> [For background: I'm using a non splittable custom Hadoop InputFormat
>> which means I end up with an RDD as a single partition, and need to split
>> it to run expensive op in parallel]
>>
>> Thanks,
>> Tim
>>
>>
>>
>>
>>
>>
>


Re: Run a ParDo after a Partition

2017-10-03 Thread Tim Robertson
Answering my own question.

After a partition you "flatten" it as so (in spark this then runs on the
partitions in parallel tasks) :

PCollection inputDocs =
partitioned.apply(Flatten.pCollections()).apply(
  "Convert to SOLR", ParDo.of(new ParseToSOLRDoc()));




On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson 
wrote:

> Hi folks,
>
> I feel a little daft asking this, and suspect I am missing the obvious...
>
> Can someone please tell me how I can do a ParDo following a Partition?
>
> In spark I'd just repartition(...) and then a map() but I don't spot in
> the Beam API how to run a ParDo on each partition in parallel.  Do I need
> to multithread manually?
>
> I tried this:
>
> PCollectionList partitioned = 
> verbatimRecords.apply(Partition.of(10, new RecordPartitioner()));
>
> // does not run in parallel on spark...
>
> for (PCollection untyped : partitioned.getAll()) {
> PCollection inputDocs = partitioned.get(untyped).apply(
> "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc()));
>
> inputDocs.apply(SolrIO.write().to("solr-load").
> withConnectionConfiguration(conn));
> }
>
>
> [For background: I'm using a non splittable custom Hadoop InputFormat
> which means I end up with an RDD as a single partition, and need to split
> it to run expensive op in parallel]
>
> Thanks,
> Tim
>
>
>
>
>
>


Run a ParDo after a Partition

2017-10-03 Thread Tim Robertson
Hi folks,

I feel a little daft asking this, and suspect I am missing the obvious...

Can someone please tell me how I can do a ParDo following a Partition?

In spark I'd just repartition(...) and then a map() but I don't spot in the
Beam API how to run a ParDo on each partition in parallel.  Do I need to
multithread manually?

I tried this:

PCollectionList partitioned =
verbatimRecords.apply(Partition.of(10, new RecordPartitioner()));

// does not run in parallel on spark...

for (PCollection untyped : partitioned.getAll()) {
PCollection inputDocs = partitioned.get(untyped).apply(
"convert-to-solr-format", ParDo.of(new ParseToSOLRDoc()));

inputDocs.apply(SolrIO.write().to("solr-load"
).withConnectionConfiguration(conn));
}


[For background: I'm using a non splittable custom Hadoop InputFormat which
means I end up with an RDD as a single partition, and need to split it to
run expensive op in parallel]

Thanks,
Tim