On Tue, May 17, 2016 at 9:19 PM, Frances Perry <[email protected]> wrote:
> Might make sense to generalize the WithTimestamps transform into
> Timestamps.set(lambda) and Timestamps.extract() ? Though I'm not sure what
> contain to use for the result of extracting the timestamp. It's kind of a
> misuse of KV, but I'm not sure there's a better option in Java.

IIRC, we already have a TimestampedValue class in the SDK.

> What kinds
> of things do you want to do with the timestamp once you extract it?
>
> On Tue, May 17, 2016 at 6:08 PM, Jesse Anderson <[email protected]>
> wrote:
>>
>> Good point. Do you think there's any value to adding a transform that
>> enriches a PCollection with the timestamp? The transform could also take a
>> PCollection and make one of its Instant members the timestamp.
>>
>> On Tue, May 17, 2016 at 5:51 PM Ben Chambers <[email protected]> wrote:
>>>
>>> DoFn is not a functional interface, so lambdas won't work with it. If you
>>> look at MapElements and FlatMapElements, these are transforms built on top
>>> of ParDo that allow passing a lambda. Unfortunately, due to issues with type
>>> erasure and Java generics, when using a lambda it is necessary to specify
>>> the output types as well. You can see an example of this in the java8tests:
>>>
>>>
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
>>>
>>> Unfortunately, this still won't work well for your use case since the
>>> lambda doesn't have access to the timestamp.
>>>
>>> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <[email protected]>
>>> wrote:
>>>>
>>>> Is there a way to create a DoFN with a lambda function? The DoFN class
>>>> itself should support it, but the overloading of the ParDo.of causes
>>>> ambiguity for a lambda function. Is there a different way to accomplish
>>>> this?
>>>>
>>>> To answer the why would you want this:
>>>>
>>>>   static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
>>>> KV<String, KV<Instant, Long>>> {
>>>>     @Override
>>>>     public void processElement(DoFn<KV<String, Long>, KV<String,
>>>> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>>>>       context.output(KV.of(context.element().getKey(),
>>>> KV.of(context.timestamp(), context.element().getValue())));
>>>>     }
>>>>   }
>>>>
>>>> This is a DoFN that I wrote to enrich a PCollection with a the time
>>>> (Instant). I need the access to the ProcessContext to get the timestamp.
>>>> This would be much easier expressed with a lambda.
>>>>
>>>> Thanks,
>>>>
>>>> Jesse
>
>

Reply via email to