The question of displaying contents of a (small) PCollection has been asked several times. There are various reasons why this is challenging, primarily centered around leading users into a sub-optimal outcome / experience. On the other hand, usefulness and educational value is certainly there.
This is probably something we should rethink -- but, in the meanwhile, writing a debugging DoFn is the way to go. On Fri, May 20, 2016 at 7:37 AM, Ismaël Mejía <[email protected]> wrote: > This reminds me of my first email to this list when I created the > Log.Print transform to print intermidiate steps Jesse, maybe you can use > something like that: > > > https://github.com/iemejia/beam-playground/blob/master/src/test/java/org/apache/beam/contrib/transforms/DebugTest.java > > Frances is right for a real case scenario, but the case that Jesse > mentions (similar to mine) is the development time use where commonly we > play with a local sample and we need to see what happened after the > PTransforms. This also can be useful for the integration with notebooks > (Zeppelin, Jupyter, etc). > > Ismael > > > On Fri, May 20, 2016 at 3:55 PM, Jesse Anderson <[email protected]> > wrote: > >> @amit, yes that's the reason I was looking for one. Students often want a >> quick sanity check for their processing. The workaround now is to write out >> the collection to a file and then open it. >> >> On Fri, May 20, 2016, 12:15 AM Amit Sela <[email protected]> wrote: >> >>> For the Spark runner this will print to the driver stdout, which in some >>> cases is the console you're running the job from (depends on execution >>> modes - standalone/yarn-client), and in others it's the driver's stdout log >>> (yarn-cluster, not sure about Mesos). >>> >>> Generally, if such PTransform is considered for the SDK, I think it >>> should print to stdout in a centralized location (in Spark it's the driver >>> node) and it should be capped by N to avoid OOM. I can definitely see this >>> useful for development, even if it's mostly for IDE use, and I think this >>> is what Jesse was looking for because debugging a PCollection's content in >>> the IDE is useless unless you apply the runner's (implementation of) >>> "collect" method to see what it contains. >>> >>> If we're looking for something to print the content of PCollection's >>> partitions on the nodes, this could simply be a composite transform that >>> iterates over the first N elements in the partition and logs their values. >>> >>> On Fri, May 20, 2016 at 8:56 AM Aljoscha Krettek <[email protected]> >>> wrote: >>> >>>> I removed ConsoleIO in my latest PR since it was specific to Flink. It >>>> was just a wrapper for a DoFn that prints to System.out, so it would be >>>> super easy to implement in core Beam. I don't how, however, if this is >>>> useful since for most runners this would print to some file on the worker >>>> where the DoFn is running. This is only useful when running inside an IDE. >>>> >>>> On Fri, 20 May 2016 at 06:06 Frances Perry <[email protected]> wrote: >>>> >>>>> Looks like Flink and Spark have their own implementations of ConsoleIO >>>>> but there's no SDK level version. Generally that goes against the goal of >>>>> having generally useful IO connectors, with potential runner-specific >>>>> overrides. Is there a generalized version we could add? >>>>> >>>>> On Thu, May 19, 2016 at 12:51 PM, Amit Sela <[email protected]> >>>>> wrote: >>>>> >>>>>> I'll join Frances and add that even Spark warns about using collect() >>>>>> as it might not fit into Driver memory, and like you said it's only for >>>>>> small datasets - but small is relative... >>>>>> I you want some "printout" during development you could use ConsoleIO >>>>>> (for streaming) - both Flink an Spark runner support this (and Beam >>>>>> doesn't >>>>>> AFAIK). >>>>>> >>>>>> On Thu, May 19, 2016 at 9:49 PM Frances Perry <[email protected]> wrote: >>>>>> >>>>>>> Oh, and I forgot the unified model! For unbounded collections, you >>>>>>> can't ask to access the whole collection, given you'll be waiting >>>>>>> forever >>>>>>> ;-) Thomas has some work in progress on making PAssert support >>>>>>> per-window >>>>>>> assertions though, so it'll be able to handle that case in the future. >>>>>>> >>>>>>> On Thu, May 19, 2016 at 11:26 AM, Frances Perry <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Nope. A Beam pipeline goes through three different phases: >>>>>>>> (1) Graph construction -- from Pipeline.create() until >>>>>>>> Pipeline.run(). During this phase PCollection are just placeholders -- >>>>>>>> their contents don't exist. >>>>>>>> (2) Submission -- Pipeline.run() submits the pipeline to a runner >>>>>>>> for execution. >>>>>>>> (3) Execution -- The runner runs the pipeline. May continue after >>>>>>>> Pipeline.run() returns, depending on the runner. Some PCollections may >>>>>>>> get >>>>>>>> materialized during execution, but others may not (for example, if the >>>>>>>> runner chooses to fuse or optimize them away). >>>>>>>> >>>>>>>> So in other words, there's no place in your main program when a >>>>>>>> PCollection is guaranteed to have its contents available. >>>>>>>> >>>>>>>> If you are attempting to debug a job, you might look into PAssert >>>>>>>> [1], which gives you a place to inspect the entire contents of a >>>>>>>> PCollection and do something with it. In the DirectPipelineRunner that >>>>>>>> code >>>>>>>> will run locally, so you'll see the output. But of course, in other >>>>>>>> runners >>>>>>>> it may helpfully print on some random worker somewhere. But you can >>>>>>>> also >>>>>>>> write tests that assert on its contents, which will work on any runner. >>>>>>>> (This is how the @RunnableOnService tests work that we are setting up >>>>>>>> for >>>>>>>> all runners.) >>>>>>>> >>>>>>>> We should get some documentation on testing put together on the >>>>>>>> Beam site, but in the meantime the info on the Dataflow site [2] >>>>>>>> roughly >>>>>>>> applies (DataflowAssert was renamed PAssert in Beam). >>>>>>>> >>>>>>>> Hope that helps! >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java >>>>>>>> [2] >>>>>>>> https://cloud.google.com/dataflow/pipelines/testing-your-pipeline >>>>>>>> >>>>>>>> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Is there a way to get the PCollection's results in the executing >>>>>>>>> process? In Spark this is using the collect method on an RDD. >>>>>>>>> >>>>>>>>> This would be for small amounts of data stored in a PCollection >>>>>>>>> like: >>>>>>>>> >>>>>>>>> PCollection<Long> count = partition.apply(Count.globally()); >>>>>>>>> System.out.println(valuesincount); >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Jesse >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>> >
