This reminds me of my first email to this list when I created the Log.Print transform to print intermidiate steps Jesse, maybe you can use something like that:
https://github.com/iemejia/beam-playground/blob/master/src/test/java/org/apache/beam/contrib/transforms/DebugTest.java Frances is right for a real case scenario, but the case that Jesse mentions (similar to mine) is the development time use where commonly we play with a local sample and we need to see what happened after the PTransforms. This also can be useful for the integration with notebooks (Zeppelin, Jupyter, etc). Ismael On Fri, May 20, 2016 at 3:55 PM, Jesse Anderson <[email protected]> wrote: > @amit, yes that's the reason I was looking for one. Students often want a > quick sanity check for their processing. The workaround now is to write out > the collection to a file and then open it. > > On Fri, May 20, 2016, 12:15 AM Amit Sela <[email protected]> wrote: > >> For the Spark runner this will print to the driver stdout, which in some >> cases is the console you're running the job from (depends on execution >> modes - standalone/yarn-client), and in others it's the driver's stdout log >> (yarn-cluster, not sure about Mesos). >> >> Generally, if such PTransform is considered for the SDK, I think it >> should print to stdout in a centralized location (in Spark it's the driver >> node) and it should be capped by N to avoid OOM. I can definitely see this >> useful for development, even if it's mostly for IDE use, and I think this >> is what Jesse was looking for because debugging a PCollection's content in >> the IDE is useless unless you apply the runner's (implementation of) >> "collect" method to see what it contains. >> >> If we're looking for something to print the content of PCollection's >> partitions on the nodes, this could simply be a composite transform that >> iterates over the first N elements in the partition and logs their values. >> >> On Fri, May 20, 2016 at 8:56 AM Aljoscha Krettek <[email protected]> >> wrote: >> >>> I removed ConsoleIO in my latest PR since it was specific to Flink. It >>> was just a wrapper for a DoFn that prints to System.out, so it would be >>> super easy to implement in core Beam. I don't how, however, if this is >>> useful since for most runners this would print to some file on the worker >>> where the DoFn is running. This is only useful when running inside an IDE. >>> >>> On Fri, 20 May 2016 at 06:06 Frances Perry <[email protected]> wrote: >>> >>>> Looks like Flink and Spark have their own implementations of ConsoleIO >>>> but there's no SDK level version. Generally that goes against the goal of >>>> having generally useful IO connectors, with potential runner-specific >>>> overrides. Is there a generalized version we could add? >>>> >>>> On Thu, May 19, 2016 at 12:51 PM, Amit Sela <[email protected]> >>>> wrote: >>>> >>>>> I'll join Frances and add that even Spark warns about using collect() >>>>> as it might not fit into Driver memory, and like you said it's only for >>>>> small datasets - but small is relative... >>>>> I you want some "printout" during development you could use ConsoleIO >>>>> (for streaming) - both Flink an Spark runner support this (and Beam >>>>> doesn't >>>>> AFAIK). >>>>> >>>>> On Thu, May 19, 2016 at 9:49 PM Frances Perry <[email protected]> wrote: >>>>> >>>>>> Oh, and I forgot the unified model! For unbounded collections, you >>>>>> can't ask to access the whole collection, given you'll be waiting forever >>>>>> ;-) Thomas has some work in progress on making PAssert support per-window >>>>>> assertions though, so it'll be able to handle that case in the future. >>>>>> >>>>>> On Thu, May 19, 2016 at 11:26 AM, Frances Perry <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Nope. A Beam pipeline goes through three different phases: >>>>>>> (1) Graph construction -- from Pipeline.create() until >>>>>>> Pipeline.run(). During this phase PCollection are just placeholders -- >>>>>>> their contents don't exist. >>>>>>> (2) Submission -- Pipeline.run() submits the pipeline to a runner >>>>>>> for execution. >>>>>>> (3) Execution -- The runner runs the pipeline. May continue after >>>>>>> Pipeline.run() returns, depending on the runner. Some PCollections may >>>>>>> get >>>>>>> materialized during execution, but others may not (for example, if the >>>>>>> runner chooses to fuse or optimize them away). >>>>>>> >>>>>>> So in other words, there's no place in your main program when a >>>>>>> PCollection is guaranteed to have its contents available. >>>>>>> >>>>>>> If you are attempting to debug a job, you might look into PAssert >>>>>>> [1], which gives you a place to inspect the entire contents of a >>>>>>> PCollection and do something with it. In the DirectPipelineRunner that >>>>>>> code >>>>>>> will run locally, so you'll see the output. But of course, in other >>>>>>> runners >>>>>>> it may helpfully print on some random worker somewhere. But you can also >>>>>>> write tests that assert on its contents, which will work on any runner. >>>>>>> (This is how the @RunnableOnService tests work that we are setting up >>>>>>> for >>>>>>> all runners.) >>>>>>> >>>>>>> We should get some documentation on testing put together on the Beam >>>>>>> site, but in the meantime the info on the Dataflow site [2] roughly >>>>>>> applies >>>>>>> (DataflowAssert was renamed PAssert in Beam). >>>>>>> >>>>>>> Hope that helps! >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java >>>>>>> [2] >>>>>>> https://cloud.google.com/dataflow/pipelines/testing-your-pipeline >>>>>>> >>>>>>> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Is there a way to get the PCollection's results in the executing >>>>>>>> process? In Spark this is using the collect method on an RDD. >>>>>>>> >>>>>>>> This would be for small amounts of data stored in a PCollection >>>>>>>> like: >>>>>>>> >>>>>>>> PCollection<Long> count = partition.apply(Count.globally()); >>>>>>>> System.out.println(valuesincount); >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Jesse >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>
