Re: Run a ParDo after a Partition
Thanks everyone. Very informative answers and Redistribute / Reshuffle is what I missed. Might it help adding a table in the docs which summarises the beam transformations and their equivalents in Spark, Apex, Flink etc? I'd be happy to contribute. Answering "why a repartition?". Two uses cases I've had in Hadoop and Spark are 1) inputformats on hadoop that aren't splittable (e.g. Zip files) and 2) partitioning for data locality for a fast ingest into e.g HBase (see http://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/). In this case I'm reading zip files residing on HDFS which is inherently a single threaded op but then need to run expensive operations on the resulting dataset in parallel. Thanks, Tim, Sent from my iPhone > On 3 Oct 2017, at 18:16, Thomas Groh wrote: > > Just to make sure I'm understanding the question: > > You have some input which cannot be split, and you want to read it all in, > redistribute all the records across your available workers, and perform some > processing on those records in parallel, yes? > > I think there's some confusion here about the Beam concept of `Partition` and > the Spark concept. In Beam, the Partition transform relates to the logical > concept of a PCollection, which is unrelated to the physical materialization > of that PCollection. In Spark, from my high-level reading, it looks like a > partition relates to some physical distribution of the data. Beam doesn't > provide direct access to controlling the partitioning of the data - > generally, runners should try to distribute the data evenly among all > available workers; processing will be performed in parallel across those > workers. > > If my understanding of the question is accurate, if you apply > `Reshuffle.viaRandomKey()` to your input (`verbatimRecords`). This should > redistribute the data arbitrarily across all of your workers/partitions. > Downstream processing should then be performed in parallel across your data > without the need for any additional explicit configuration. > > >> On Tue, Oct 3, 2017 at 8:52 AM, Eugene Kirpichov >> wrote: >> Hmm, partition + flatten is semantically a no-op (it, at most, may or may >> not cause the intermediate dataset to be materialized, there are no >> guarantees) - why do you need the Partition in the first place? >> >>> On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson >>> wrote: >>> >>> Answering my own question. >>> >>> After a partition you "flatten" it as so (in spark this then runs on the >>> partitions in parallel tasks) : >>> PCollection inputDocs = >>> partitioned.apply(Flatten.pCollections()).apply( >>> "Convert to SOLR", ParDo.of(new ParseToSOLRDoc())); >>> >>> >>> On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson wrote: Hi folks, I feel a little daft asking this, and suspect I am missing the obvious... Can someone please tell me how I can do a ParDo following a Partition? In spark I'd just repartition(...) and then a map() but I don't spot in the Beam API how to run a ParDo on each partition in parallel. Do I need to multithread manually? I tried this: PCollectionList partitioned = verbatimRecords.apply(Partition.of(10, new RecordPartitioner())); // does not run in parallel on spark... for (PCollection untyped : partitioned.getAll()) { PCollection inputDocs = partitioned.get(untyped).apply( "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc())); inputDocs.apply(SolrIO.write().to("solr-load").withConnectionConfiguration(conn)); } [For background: I'm using a non splittable custom Hadoop InputFormat which means I end up with an RDD as a single partition, and need to split it to run expensive op in parallel] Thanks, Tim >>> >
Re: Run a ParDo after a Partition
Just to make sure I'm understanding the question: You have some input which cannot be split, and you want to read it all in, redistribute all the records across your available workers, and perform some processing on those records in parallel, yes? I think there's some confusion here about the Beam concept of `Partition` and the Spark concept. In Beam, the Partition transform relates to the *logical* concept of a PCollection, which is unrelated to the *physical *materialization of that PCollection. In Spark, from my high-level reading, it looks like a partition relates to some physical distribution of the data. Beam doesn't provide direct access to controlling the partitioning of the data - generally, runners should try to distribute the data evenly among all available workers; processing will be performed in parallel across those workers. If my understanding of the question is accurate, if you apply ` Reshuffle.viaRandomKey()` to your input (`verbatimRecords`). This should redistribute the data arbitrarily across all of your workers/partitions. Downstream processing should then be performed in parallel across your data without the need for any additional explicit configuration. On Tue, Oct 3, 2017 at 8:52 AM, Eugene Kirpichov wrote: > Hmm, partition + flatten is semantically a no-op (it, at most, may or may > not cause the intermediate dataset to be materialized, there are no > guarantees) - why do you need the Partition in the first place? > > On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson > wrote: > >> >> Answering my own question. >> >> After a partition you "flatten" it as so (in spark this then runs on the >> partitions in parallel tasks) : >> >> PCollection inputDocs = >> partitioned.apply(Flatten.pCollections()).apply( >> "Convert to SOLR", ParDo.of(new ParseToSOLRDoc())); >> >> >> >> >> On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson >> wrote: >> >>> Hi folks, >>> >>> I feel a little daft asking this, and suspect I am missing the obvious... >>> >>> Can someone please tell me how I can do a ParDo following a Partition? >>> >>> In spark I'd just repartition(...) and then a map() but I don't spot in >>> the Beam API how to run a ParDo on each partition in parallel. Do I need >>> to multithread manually? >>> >>> I tried this: >>> >>> PCollectionList partitioned = >>> verbatimRecords.apply(Partition.of(10, new RecordPartitioner())); >>> >>> // does not run in parallel on spark... >>> >>> for (PCollection untyped : partitioned.getAll()) { >>> PCollection inputDocs = partitioned.get(untyped). >>> apply( >>> "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc())); >>> >>> inputDocs.apply(SolrIO.write().to("solr-load"). >>> withConnectionConfiguration(conn)); >>> } >>> >>> >>> [For background: I'm using a non splittable custom Hadoop InputFormat >>> which means I end up with an RDD as a single partition, and need to split >>> it to run expensive op in parallel] >>> >>> Thanks, >>> Tim >>> >>> >>> >>> >>> >>> >>
Re: Run a ParDo after a Partition
Beam's partition concerns itself only with the logical splitting of a PCollection into a number of subsets, unlike Spark's repartition which controls the physical distribution of the data. What you probably want instead is Redistribute which redistributes the work (randomly) across all available workers. If you care about which values are colocated on which workers, you could key by the desired locality, do a GroupByKey, and then drop the key (noting that things would also be grouped by window in this case). On Tue, Oct 3, 2017 at 8:52 AM, Eugene Kirpichov wrote: > Hmm, partition + flatten is semantically a no-op (it, at most, may or may > not cause the intermediate dataset to be materialized, there are no > guarantees) - why do you need the Partition in the first place? > > On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson > wrote: >> >> >> Answering my own question. >> >> After a partition you "flatten" it as so (in spark this then runs on the >> partitions in parallel tasks) : >> >> PCollection inputDocs = >> partitioned.apply(Flatten.pCollections()).apply( >> "Convert to SOLR", ParDo.of(new ParseToSOLRDoc())); >> >> >> >> >> On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson >> wrote: >>> >>> Hi folks, >>> >>> I feel a little daft asking this, and suspect I am missing the obvious... >>> >>> Can someone please tell me how I can do a ParDo following a Partition? >>> >>> In spark I'd just repartition(...) and then a map() but I don't spot in >>> the Beam API how to run a ParDo on each partition in parallel. Do I need to >>> multithread manually? >>> >>> I tried this: >>> >>> PCollectionList partitioned = >>> verbatimRecords.apply(Partition.of(10, new RecordPartitioner())); >>> >>> // does not run in parallel on spark... >>> >>> for (PCollection untyped : partitioned.getAll()) { >>> PCollection inputDocs = >>> partitioned.get(untyped).apply( >>> "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc())); >>> >>> >>> inputDocs.apply(SolrIO.write().to("solr-load").withConnectionConfiguration(conn)); >>> } >>> >>> >>> [For background: I'm using a non splittable custom Hadoop InputFormat >>> which means I end up with an RDD as a single partition, and need to split it >>> to run expensive op in parallel] >>> >>> Thanks, >>> Tim >>> >>> >>> >>> >>> >> >
Re: Run a ParDo after a Partition
Hmm, partition + flatten is semantically a no-op (it, at most, may or may not cause the intermediate dataset to be materialized, there are no guarantees) - why do you need the Partition in the first place? On Tue, Oct 3, 2017 at 6:18 AM Tim Robertson wrote: > > Answering my own question. > > After a partition you "flatten" it as so (in spark this then runs on the > partitions in parallel tasks) : > > PCollection inputDocs = > partitioned.apply(Flatten.pCollections()).apply( > "Convert to SOLR", ParDo.of(new ParseToSOLRDoc())); > > > > > On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson > wrote: > >> Hi folks, >> >> I feel a little daft asking this, and suspect I am missing the obvious... >> >> Can someone please tell me how I can do a ParDo following a Partition? >> >> In spark I'd just repartition(...) and then a map() but I don't spot in >> the Beam API how to run a ParDo on each partition in parallel. Do I need >> to multithread manually? >> >> I tried this: >> >> PCollectionList partitioned = >> verbatimRecords.apply(Partition.of(10, new RecordPartitioner())); >> >> // does not run in parallel on spark... >> >> for (PCollection untyped : partitioned.getAll()) { >> PCollection inputDocs = partitioned.get(untyped).apply( >> "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc())); >> >> inputDocs.apply(SolrIO.write().to("solr-load" >> ).withConnectionConfiguration(conn)); >> } >> >> >> [For background: I'm using a non splittable custom Hadoop InputFormat >> which means I end up with an RDD as a single partition, and need to split >> it to run expensive op in parallel] >> >> Thanks, >> Tim >> >> >> >> >> >> >
Re: Run a ParDo after a Partition
Answering my own question. After a partition you "flatten" it as so (in spark this then runs on the partitions in parallel tasks) : PCollection inputDocs = partitioned.apply(Flatten.pCollections()).apply( "Convert to SOLR", ParDo.of(new ParseToSOLRDoc())); On Tue, Oct 3, 2017 at 2:48 PM, Tim Robertson wrote: > Hi folks, > > I feel a little daft asking this, and suspect I am missing the obvious... > > Can someone please tell me how I can do a ParDo following a Partition? > > In spark I'd just repartition(...) and then a map() but I don't spot in > the Beam API how to run a ParDo on each partition in parallel. Do I need > to multithread manually? > > I tried this: > > PCollectionList partitioned = > verbatimRecords.apply(Partition.of(10, new RecordPartitioner())); > > // does not run in parallel on spark... > > for (PCollection untyped : partitioned.getAll()) { > PCollection inputDocs = partitioned.get(untyped).apply( > "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc())); > > inputDocs.apply(SolrIO.write().to("solr-load"). > withConnectionConfiguration(conn)); > } > > > [For background: I'm using a non splittable custom Hadoop InputFormat > which means I end up with an RDD as a single partition, and need to split > it to run expensive op in parallel] > > Thanks, > Tim > > > > > >
Run a ParDo after a Partition
Hi folks, I feel a little daft asking this, and suspect I am missing the obvious... Can someone please tell me how I can do a ParDo following a Partition? In spark I'd just repartition(...) and then a map() but I don't spot in the Beam API how to run a ParDo on each partition in parallel. Do I need to multithread manually? I tried this: PCollectionList partitioned = verbatimRecords.apply(Partition.of(10, new RecordPartitioner())); // does not run in parallel on spark... for (PCollection untyped : partitioned.getAll()) { PCollection inputDocs = partitioned.get(untyped).apply( "convert-to-solr-format", ParDo.of(new ParseToSOLRDoc())); inputDocs.apply(SolrIO.write().to("solr-load" ).withConnectionConfiguration(conn)); } [For background: I'm using a non splittable custom Hadoop InputFormat which means I end up with an RDD as a single partition, and need to split it to run expensive op in parallel] Thanks, Tim