On Wed, May 18, 2016 at 8:03 AM, Jesse Anderson <[email protected]> wrote:
> The use case is from my thread yesterday on removing windowing. The output > I wanted was: > 54.148.33.jdj Hits:44 At:2015-03-31T04:00:29.999Z > 54.148.33.jdj Hits:44 At:2015-03-31T04:00:59.999Z > 54.148.33.jdj Hits:2 At:2015-03-31T04:01:29.999Z > 107.22.225.dea Hits:18 At:2015-03-31T04:00:29.999Z > 107.22.225.dea Hits:18 At:2015-03-31T04:00:59.999Z > 107.22.225.dea Hits:1 At:2015-03-31T04:01:29.999Z > 190.29.67.djc Hits:1 At:2015-03-31T04:00:29.999Z > 190.29.67.djc Hits:1 At:2015-03-31T04:00:59.999Z > > In order to add the timestamps, I had to use a DoFN instead of > a FlatMapElements. I needed to access the ProcessContext. > WithTimestamps? https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java > > In order to output the timestamps after the GroupByKey, I had to enrich > the PCollection with the timestamps using a DoFN instead of a > FlatMapElements. I needed to access the ProcessContext. > Yup -- so that's why I was suggesting we add ExtractTimestamps or something as the dual of WithTimestamps, or perhaps generalize both bits of functionality into Timestamps.set() and Timestamps.get(). > > On Tue, May 17, 2016 at 9:19 PM Frances Perry <[email protected]> wrote: > >> Might make sense to generalize the WithTimestamps transform into >> Timestamps.set(lambda) and Timestamps.extract() ? Though I'm not sure what >> contain to use for the result of extracting the timestamp. It's kind of a >> misuse of KV, but I'm not sure there's a better option in Java. What kinds >> of things do you want to do with the timestamp once you extract it? >> >> On Tue, May 17, 2016 at 6:08 PM, Jesse Anderson <[email protected]> >> wrote: >> >>> Good point. Do you think there's any value to adding a transform that >>> enriches a PCollection with the timestamp? The transform could also take a >>> PCollection and make one of its Instant members the timestamp. >>> >>> On Tue, May 17, 2016 at 5:51 PM Ben Chambers <[email protected]> >>> wrote: >>> >>>> DoFn is not a functional interface, so lambdas won't work with it. If >>>> you look at MapElements and FlatMapElements, these are transforms built on >>>> top of ParDo that allow passing a lambda. Unfortunately, due to issues with >>>> type erasure and Java generics, when using a lambda it is necessary to >>>> specify the output types as well. You can see an example of this in the >>>> java8tests: >>>> >>>> >>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49 >>>> >>>> Unfortunately, this still won't work well for your use case since the >>>> lambda doesn't have access to the timestamp. >>>> >>>> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <[email protected]> >>>> wrote: >>>> >>>>> Is there a way to create a DoFN with a lambda function? The DoFN class >>>>> itself should support it, but the overloading of the ParDo.of causes >>>>> ambiguity for a lambda function. Is there a different way to accomplish >>>>> this? >>>>> >>>>> To answer the why would you want this: >>>>> >>>>> static class EnrichWithTimestampFN extends DoFn<KV<String, Long>, >>>>> KV<String, KV<Instant, Long>>> { >>>>> @Override >>>>> public void processElement(DoFn<KV<String, Long>, KV<String, >>>>> KV<Instant, Long>>>.ProcessContext context) throws Exception { >>>>> context.output(KV.of(context.element().getKey(), >>>>> KV.of(context.timestamp(), context.element().getValue()))); >>>>> } >>>>> } >>>>> >>>>> This is a DoFN that I wrote to enrich a PCollection with a the time >>>>> (Instant). I need the access to the ProcessContext to get the timestamp. >>>>> This would be much easier expressed with a lambda. >>>>> >>>>> Thanks, >>>>> >>>>> Jesse >>>>> >>>> >>
