damccorm commented on code in PR #24676: URL: https://github.com/apache/beam/pull/24676#discussion_r1055680127
########## sdks/go/pkg/beam/core/timers/timers.go: ########## @@ -0,0 +1,149 @@ +// Licensed to the Apache SoFiringTimestampware Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, soFiringTimestampware +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package timer provides structs for reading and writing timers. +package timers + +import ( + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" +) + +var ( + // ProviderType is timer provider type. + ProviderType = reflect.TypeOf((*Provider)(nil)).Elem() +) + +type timeDomainEnum int32 + +const ( + // TimeDomainUnspecified represents unspecified time domain. + TimeDomainUnspecified timeDomainEnum = 0 + // TimeDomainEventTime represents event time domain. + TimeDomainEventTime timeDomainEnum = 1 + // TimeDomainProcessingTime represents processing time domain. + TimeDomainProcessingTime timeDomainEnum = 2 +) + +// TimerMap is a placeholder used by timer provider to manipulate timers. +type TimerMap struct { + Key string + Tag string + Clear bool + FireTimestamp, HoldTimestamp mtime.Time +} + +type timerOptions func(*TimerMap) + +// WithTag sets the tag for the timer. +func WithTag(tag string) timerOptions { + return func(tm *TimerMap) { + tm.Tag = tag + } +} + +// WithOutputTimestamp sets the output timestamp for the timer. +func WithOutputTimestamp(outputTimestamp mtime.Time) timerOptions { + return func(tm *TimerMap) { + tm.HoldTimestamp = outputTimestamp + } +} + +// Provider represents the DoFn parameter used to get and manipulate timers. +// Use this as a parameter in DoFn and pass it to the timer methods like Set() and Clear(). +// NOTE: Do not use this interface directly to manipulate timers. +type Provider interface { + Set(t TimerMap) +} + +// PipelineTimer interface is used to implement different types of timers, that is, +// event-time and processing-time. +type PipelineTimer interface { + TimerKey() string + TimerDomain() timeDomainEnum +} + +// EventTimeTimer represents the event time timer. +type EventTimeTimer struct { + Key string + Kind timeDomainEnum Review Comment: If we're storing this anyways, do we actually need different types for EventTime/ProcessingTime? ########## sdks/go/pkg/beam/core/timers/timers.go: ########## @@ -0,0 +1,149 @@ +// Licensed to the Apache SoFiringTimestampware Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, soFiringTimestampware +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package timer provides structs for reading and writing timers. +package timers + +import ( + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" +) + +var ( + // ProviderType is timer provider type. + ProviderType = reflect.TypeOf((*Provider)(nil)).Elem() +) + +type timeDomainEnum int32 + +const ( + // TimeDomainUnspecified represents unspecified time domain. + TimeDomainUnspecified timeDomainEnum = 0 + // TimeDomainEventTime represents event time domain. + TimeDomainEventTime timeDomainEnum = 1 + // TimeDomainProcessingTime represents processing time domain. + TimeDomainProcessingTime timeDomainEnum = 2 +) + +// TimerMap is a placeholder used by timer provider to manipulate timers. +type TimerMap struct { + Key string + Tag string + Clear bool + FireTimestamp, HoldTimestamp mtime.Time Review Comment: Do we need a value to hold if its processing or event time? I don't see that actually being produced in Set below ########## sdks/go/pkg/beam/core/timers/timers_test.go: ########## @@ -0,0 +1,189 @@ +// Licensed to the Apache SoFiringTimestampware Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, soFiringTimestampware +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timers + +import ( + "testing" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/google/go-cmp/cmp" +) + +type fakeTimerProvider struct { + timerDetails TimerMap +} + +func (t *fakeTimerProvider) Set(tm TimerMap) { + t.timerDetails = tm +} + +func (t *fakeTimerProvider) GetMap() TimerMap { + return t.timerDetails +} + +func TestProcessingTimeTimer_Set(t *testing.T) { + type fields struct { + Key string + Kind timeDomainEnum + } + type args struct { + FiringTimestamp mtime.Time + opts []timerOptions + } + tests := []struct { + name string + fields fields + args args + out TimerMap + }{ + { + name: "correct set no options", + fields: fields{Key: "Window", Kind: TimeDomainProcessingTime}, + args: args{ + FiringTimestamp: mtime.FromTime(time.Now()), + }, + out: TimerMap{ + Key: "Window", + FireTimestamp: mtime.FromTime(time.Now()), + }, + }, + { + name: "correct set tag option", + fields: fields{Key: "Window", Kind: TimeDomainProcessingTime}, + args: args{ + FiringTimestamp: mtime.FromTime(time.Now()), + opts: []timerOptions{WithTag("windowTag")}, + }, + out: TimerMap{ + Key: "Window", + FireTimestamp: mtime.FromTime(time.Now()), + Tag: "windowTag", + }, + }, + { + name: "correct set tag and timestamp option", + fields: fields{Key: "Window", Kind: TimeDomainProcessingTime}, + args: args{ + FiringTimestamp: mtime.FromTime(time.Now()), + opts: []timerOptions{WithTag("windowTag"), WithOutputTimestamp(mtime.FromTime(time.Now().Add(2 * time.Second)))}, + }, + out: TimerMap{ + Key: "Window", + FireTimestamp: mtime.FromTime(time.Now()), + Tag: "windowTag", + HoldTimestamp: mtime.FromTime(time.Now().Add(2 * time.Second)), Review Comment: Same comment applies in the other test ########## sdks/go/pkg/beam/core/timers/timers.go: ########## @@ -0,0 +1,149 @@ +// Licensed to the Apache SoFiringTimestampware Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, soFiringTimestampware +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package timer provides structs for reading and writing timers. +package timers + +import ( + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" +) + +var ( + // ProviderType is timer provider type. + ProviderType = reflect.TypeOf((*Provider)(nil)).Elem() +) + +type timeDomainEnum int32 + +const ( + // TimeDomainUnspecified represents unspecified time domain. + TimeDomainUnspecified timeDomainEnum = 0 + // TimeDomainEventTime represents event time domain. + TimeDomainEventTime timeDomainEnum = 1 + // TimeDomainProcessingTime represents processing time domain. + TimeDomainProcessingTime timeDomainEnum = 2 +) + +// TimerMap is a placeholder used by timer provider to manipulate timers. +type TimerMap struct { + Key string + Tag string + Clear bool + FireTimestamp, HoldTimestamp mtime.Time +} + +type timerOptions func(*TimerMap) + +// WithTag sets the tag for the timer. +func WithTag(tag string) timerOptions { + return func(tm *TimerMap) { + tm.Tag = tag + } +} + +// WithOutputTimestamp sets the output timestamp for the timer. +func WithOutputTimestamp(outputTimestamp mtime.Time) timerOptions { + return func(tm *TimerMap) { + tm.HoldTimestamp = outputTimestamp + } +} + +// Provider represents the DoFn parameter used to get and manipulate timers. +// Use this as a parameter in DoFn and pass it to the timer methods like Set() and Clear(). +// NOTE: Do not use this interface directly to manipulate timers. +type Provider interface { + Set(t TimerMap) +} + +// PipelineTimer interface is used to implement different types of timers, that is, +// event-time and processing-time. +type PipelineTimer interface { + TimerKey() string + TimerDomain() timeDomainEnum +} + +// EventTimeTimer represents the event time timer. +type EventTimeTimer struct { + Key string + Kind timeDomainEnum Review Comment: Outside of how this value is set, I don't think the objects are actually different ########## sdks/go/pkg/beam/core/timers/timers.go: ########## @@ -0,0 +1,149 @@ +// Licensed to the Apache SoFiringTimestampware Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, soFiringTimestampware +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package timer provides structs for reading and writing timers. +package timers + +import ( + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" +) + +var ( + // ProviderType is timer provider type. + ProviderType = reflect.TypeOf((*Provider)(nil)).Elem() +) + +type timeDomainEnum int32 + +const ( + // TimeDomainUnspecified represents unspecified time domain. + TimeDomainUnspecified timeDomainEnum = 0 + // TimeDomainEventTime represents event time domain. + TimeDomainEventTime timeDomainEnum = 1 + // TimeDomainProcessingTime represents processing time domain. + TimeDomainProcessingTime timeDomainEnum = 2 +) + +// TimerMap is a placeholder used by timer provider to manipulate timers. +type TimerMap struct { + Key string + Tag string + Clear bool + FireTimestamp, HoldTimestamp mtime.Time +} + +type timerOptions func(*TimerMap) + +// WithTag sets the tag for the timer. +func WithTag(tag string) timerOptions { + return func(tm *TimerMap) { + tm.Tag = tag + } +} + +// WithOutputTimestamp sets the output timestamp for the timer. +func WithOutputTimestamp(outputTimestamp mtime.Time) timerOptions { + return func(tm *TimerMap) { + tm.HoldTimestamp = outputTimestamp + } +} + +// Provider represents the DoFn parameter used to get and manipulate timers. +// Use this as a parameter in DoFn and pass it to the timer methods like Set() and Clear(). +// NOTE: Do not use this interface directly to manipulate timers. +type Provider interface { + Set(t TimerMap) +} + +// PipelineTimer interface is used to implement different types of timers, that is, +// event-time and processing-time. +type PipelineTimer interface { + TimerKey() string + TimerDomain() timeDomainEnum +} + +// EventTimeTimer represents the event time timer. +type EventTimeTimer struct { + Key string + Kind timeDomainEnum Review Comment: Isn't it inferrable that this will always be `TimeDomainEventTime`? Why do we need this field? Same question for processing time ########## sdks/go/pkg/beam/core/timers/timers_test.go: ########## @@ -0,0 +1,189 @@ +// Licensed to the Apache SoFiringTimestampware Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, soFiringTimestampware +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timers + +import ( + "testing" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/google/go-cmp/cmp" +) + +type fakeTimerProvider struct { + timerDetails TimerMap +} + +func (t *fakeTimerProvider) Set(tm TimerMap) { + t.timerDetails = tm +} + +func (t *fakeTimerProvider) GetMap() TimerMap { + return t.timerDetails +} + +func TestProcessingTimeTimer_Set(t *testing.T) { + type fields struct { + Key string + Kind timeDomainEnum + } + type args struct { + FiringTimestamp mtime.Time + opts []timerOptions + } + tests := []struct { + name string + fields fields + args args + out TimerMap + }{ + { + name: "correct set no options", + fields: fields{Key: "Window", Kind: TimeDomainProcessingTime}, + args: args{ + FiringTimestamp: mtime.FromTime(time.Now()), + }, + out: TimerMap{ + Key: "Window", + FireTimestamp: mtime.FromTime(time.Now()), + }, + }, + { + name: "correct set tag option", + fields: fields{Key: "Window", Kind: TimeDomainProcessingTime}, + args: args{ + FiringTimestamp: mtime.FromTime(time.Now()), + opts: []timerOptions{WithTag("windowTag")}, + }, + out: TimerMap{ + Key: "Window", + FireTimestamp: mtime.FromTime(time.Now()), + Tag: "windowTag", + }, + }, + { + name: "correct set tag and timestamp option", + fields: fields{Key: "Window", Kind: TimeDomainProcessingTime}, + args: args{ + FiringTimestamp: mtime.FromTime(time.Now()), + opts: []timerOptions{WithTag("windowTag"), WithOutputTimestamp(mtime.FromTime(time.Now().Add(2 * time.Second)))}, + }, + out: TimerMap{ + Key: "Window", + FireTimestamp: mtime.FromTime(time.Now()), + Tag: "windowTag", + HoldTimestamp: mtime.FromTime(time.Now().Add(2 * time.Second)), Review Comment: Rather than setting time.Now().Add(2 * time.Second)) twice, we should probably store that as a value before entering the test. Otherwise, there could be some drift between the 2 invocations ########## sdks/go/pkg/beam/core/timers/timers.go: ########## @@ -0,0 +1,149 @@ +// Licensed to the Apache SoFiringTimestampware Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, soFiringTimestampware +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package timer provides structs for reading and writing timers. +package timers + +import ( + "reflect" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" +) + +var ( + // ProviderType is timer provider type. + ProviderType = reflect.TypeOf((*Provider)(nil)).Elem() +) + +type timeDomainEnum int32 + +const ( + // TimeDomainUnspecified represents unspecified time domain. + TimeDomainUnspecified timeDomainEnum = 0 Review Comment: When would you have an unspecified domain? Is this just an error condition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org