I did this with a lambda. Code in case someone's searching lands them here:
      merged.apply(MapElements.<KV<String, Long>, Long>via((KV<String,
Long> input) -> {
        System.out.println(input);
        return 0L;
      }).withOutputType(TypeDescriptors.longs()));

Caveats: use only on small datasets. This code only works locally like on
an IDE.

On Fri, May 20, 2016 at 11:21 AM Davor Bonaci <[email protected]> wrote:

> The question of displaying contents of a (small) PCollection has been
> asked several times. There are various reasons why this is challenging,
> primarily centered around leading users into a sub-optimal outcome /
> experience. On the other hand, usefulness and educational value is
> certainly there.
>
> This is probably something we should rethink -- but, in the meanwhile,
> writing a debugging DoFn is the way to go.
>
> On Fri, May 20, 2016 at 7:37 AM, Ismaël Mejía <[email protected]> wrote:
>
>> This reminds me of my first email to this list when I created the
>> Log.Print transform to print intermidiate steps Jesse, maybe you can use
>> something like that:
>>
>>
>> https://github.com/iemejia/beam-playground/blob/master/src/test/java/org/apache/beam/contrib/transforms/DebugTest.java
>>
>> Frances is right for a real case scenario, but the case that Jesse
>> mentions (similar to mine) is the development time use where commonly we
>> play with a local sample and we need to see what happened after the
>> PTransforms. This also can be useful for the integration with notebooks
>> (Zeppelin, Jupyter, etc).
>>
>> Ismael
>>
>>
>> On Fri, May 20, 2016 at 3:55 PM, Jesse Anderson <[email protected]>
>> wrote:
>>
>>> @amit, yes that's the reason I was looking for one. Students often want
>>> a quick sanity check for their processing. The workaround now is to write
>>> out the collection to a file and then open it.
>>>
>>> On Fri, May 20, 2016, 12:15 AM Amit Sela <[email protected]> wrote:
>>>
>>>> For the Spark runner this will print to the driver stdout, which in
>>>> some cases is the console you're running the job from (depends on execution
>>>> modes - standalone/yarn-client), and in others it's the driver's stdout log
>>>> (yarn-cluster, not sure about Mesos).
>>>>
>>>> Generally, if such PTransform is considered for the SDK, I think it
>>>> should print to stdout in a centralized location (in Spark it's the driver
>>>> node) and it should be capped by N to avoid OOM. I can definitely see this
>>>> useful for development, even if it's mostly for IDE use, and I think this
>>>> is what Jesse was looking for because debugging a PCollection's content in
>>>> the IDE is useless unless you apply the runner's (implementation of)
>>>> "collect" method to see what it contains.
>>>>
>>>> If we're looking for something to print the content of PCollection's
>>>> partitions on the nodes, this could simply be a composite transform that
>>>> iterates over the first N elements in the partition and logs their values.
>>>>
>>>> On Fri, May 20, 2016 at 8:56 AM Aljoscha Krettek <[email protected]>
>>>> wrote:
>>>>
>>>>> I removed ConsoleIO in my latest PR since it was specific to Flink. It
>>>>> was just a wrapper for a DoFn that prints to System.out, so it would be
>>>>> super easy to implement in core Beam. I don't how, however, if this is
>>>>> useful since for most runners this would print to some file on the worker
>>>>> where the DoFn is running. This is only useful when running inside an IDE.
>>>>>
>>>>> On Fri, 20 May 2016 at 06:06 Frances Perry <[email protected]> wrote:
>>>>>
>>>>>> Looks like Flink and Spark have their own implementations of
>>>>>> ConsoleIO but there's no SDK level version. Generally that goes against 
>>>>>> the
>>>>>> goal of having generally useful IO connectors, with potential
>>>>>> runner-specific overrides. Is there a generalized version we could add?
>>>>>>
>>>>>> On Thu, May 19, 2016 at 12:51 PM, Amit Sela <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I'll join Frances and add that even Spark warns about using
>>>>>>> collect() as it might not fit into Driver memory, and like you said it's
>>>>>>> only for small datasets - but small is relative...
>>>>>>> I you want some "printout" during development you could use
>>>>>>> ConsoleIO (for streaming) - both Flink an Spark runner support this (and
>>>>>>> Beam doesn't AFAIK).
>>>>>>>
>>>>>>> On Thu, May 19, 2016 at 9:49 PM Frances Perry <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Oh, and I forgot the unified model!  For unbounded collections, you
>>>>>>>> can't ask to access the whole collection, given you'll be waiting 
>>>>>>>> forever
>>>>>>>> ;-) Thomas has some work in progress on making PAssert support 
>>>>>>>> per-window
>>>>>>>> assertions though, so it'll be able to handle that case in the future.
>>>>>>>>
>>>>>>>> On Thu, May 19, 2016 at 11:26 AM, Frances Perry <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Nope. A Beam pipeline goes through three different phases:
>>>>>>>>> (1) Graph construction -- from Pipeline.create() until
>>>>>>>>> Pipeline.run(). During this phase PCollection are just placeholders --
>>>>>>>>> their contents don't exist.
>>>>>>>>> (2) Submission -- Pipeline.run() submits the pipeline to a runner
>>>>>>>>> for execution.
>>>>>>>>> (3) Execution -- The runner runs the pipeline. May continue after
>>>>>>>>> Pipeline.run() returns, depending on the runner. Some PCollections 
>>>>>>>>> may get
>>>>>>>>> materialized during execution, but others may not (for example, if the
>>>>>>>>> runner chooses to fuse or optimize them away).
>>>>>>>>>
>>>>>>>>> So in other words, there's no place in your main program when a
>>>>>>>>> PCollection is guaranteed to have its contents available.
>>>>>>>>>
>>>>>>>>> If you are attempting to debug a job, you might look into PAssert
>>>>>>>>> [1], which gives you a place to inspect the entire contents of a
>>>>>>>>> PCollection and do something with it. In the DirectPipelineRunner 
>>>>>>>>> that code
>>>>>>>>> will run locally, so you'll see the output. But of course, in other 
>>>>>>>>> runners
>>>>>>>>> it may helpfully print on some random worker somewhere. But you can 
>>>>>>>>> also
>>>>>>>>> write tests that assert on its contents, which will work on any 
>>>>>>>>> runner.
>>>>>>>>> (This is how the @RunnableOnService tests work that we are setting up 
>>>>>>>>> for
>>>>>>>>> all runners.)
>>>>>>>>>
>>>>>>>>> We should get some documentation on testing put together on the
>>>>>>>>> Beam site, but in the meantime the info on the Dataflow site [2] 
>>>>>>>>> roughly
>>>>>>>>> applies (DataflowAssert was renamed PAssert in Beam).
>>>>>>>>>
>>>>>>>>> Hope that helps!
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
>>>>>>>>> [2]
>>>>>>>>> https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
>>>>>>>>>
>>>>>>>>> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Is there a way to get the PCollection's results in the executing
>>>>>>>>>> process? In Spark this is using the collect method on an RDD.
>>>>>>>>>>
>>>>>>>>>> This would be for small amounts of data stored in a PCollection
>>>>>>>>>> like:
>>>>>>>>>>
>>>>>>>>>> PCollection<Long> count = partition.apply(Count.globally());
>>>>>>>>>> System.out.println(valuesincount);
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Jesse
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>
>

Reply via email to