Might make sense to generalize the WithTimestamps transform into Timestamps.set(lambda) and Timestamps.extract() ? Though I'm not sure what contain to use for the result of extracting the timestamp. It's kind of a misuse of KV, but I'm not sure there's a better option in Java. What kinds of things do you want to do with the timestamp once you extract it?
On Tue, May 17, 2016 at 6:08 PM, Jesse Anderson <[email protected]> wrote: > Good point. Do you think there's any value to adding a transform that > enriches a PCollection with the timestamp? The transform could also take a > PCollection and make one of its Instant members the timestamp. > > On Tue, May 17, 2016 at 5:51 PM Ben Chambers <[email protected]> wrote: > >> DoFn is not a functional interface, so lambdas won't work with it. If you >> look at MapElements and FlatMapElements, these are transforms built on top >> of ParDo that allow passing a lambda. Unfortunately, due to issues with >> type erasure and Java generics, when using a lambda it is necessary to >> specify the output types as well. You can see an example of this in the >> java8tests: >> >> >> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49 >> >> Unfortunately, this still won't work well for your use case since the >> lambda doesn't have access to the timestamp. >> >> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <[email protected]> >> wrote: >> >>> Is there a way to create a DoFN with a lambda function? The DoFN class >>> itself should support it, but the overloading of the ParDo.of causes >>> ambiguity for a lambda function. Is there a different way to accomplish >>> this? >>> >>> To answer the why would you want this: >>> >>> static class EnrichWithTimestampFN extends DoFn<KV<String, Long>, >>> KV<String, KV<Instant, Long>>> { >>> @Override >>> public void processElement(DoFn<KV<String, Long>, KV<String, >>> KV<Instant, Long>>>.ProcessContext context) throws Exception { >>> context.output(KV.of(context.element().getKey(), >>> KV.of(context.timestamp(), context.element().getValue()))); >>> } >>> } >>> >>> This is a DoFN that I wrote to enrich a PCollection with a the time >>> (Instant). I need the access to the ProcessContext to get the timestamp. >>> This would be much easier expressed with a lambda. >>> >>> Thanks, >>> >>> Jesse >>> >>
