[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=96280=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96280 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 27/Apr/18 23:39 Start Date: 27/Apr/18 23:39 Worklog Time Spent: 10m Work Description: herohde commented on issue #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#issuecomment-385119592 Thanks. Yes, the idea is add a custom windowFn later (similarly to how custom coders are done). My understanding is that custom WindowFns are not yet supported in the FnAPI, though. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96280) Time Spent: 6h 10m (was: 6h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=96278=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96278 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 27/Apr/18 23:35 Start Date: 27/Apr/18 23:35 Worklog Time Spent: 10m Work Description: robertwb commented on issue #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#issuecomment-385119208 Thanks! It looks like this structure only supports a finite set of builtin WindowFns, rather than letting the user define their own with a callable (WindowFn:WindowInto is like DoFn:ParDo), but that can be future work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96278) Time Spent: 5h 50m (was: 5h 40m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95799=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95799 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 27/Apr/18 00:19 Start Date: 27/Apr/18 00:19 Worklog Time Spent: 10m Work Description: herohde commented on issue #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#issuecomment-384826776 @robertwb Now I think it's ready. PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95799) Time Spent: 5h 40m (was: 5.5h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95757=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95757 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 26/Apr/18 22:33 Start Date: 26/Apr/18 22:33 Worklog Time Spent: 10m Work Description: herohde commented on issue #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#issuecomment-384809351 Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95757) Time Spent: 5.5h (was: 5h 20m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95754=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95754 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 26/Apr/18 22:29 Start Date: 26/Apr/18 22:29 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184548981 ## File path: sdks/go/pkg/beam/runners/dataflow/translate.go ## @@ -454,50 +457,52 @@ func stepID(id int) string { return fmt.Sprintf("s%v", id) } -func translateWindow(w *window.Window) proto.Message { - // TODO: The only windowing strategy we support is the global window. - if w.Kind() != window.GlobalWindow { - panic(fmt.Sprintf("Unsupported window type supplied: %v", w)) - } - // We compute the fixed content of this message for use in workflows. - msg := rnapi_pb.MessageWithComponents{ +func translateWindowingStrategy(w *window.WindowingStrategy) proto.Message { + c := graphx.NewCoderMarshaller() + ws := graphx.MarshalWindowingStrategy(c, w) + + msg := _pb.MessageWithComponents{ Components: _pb.Components{ - Coders: map[string]*rnapi_pb.Coder{ - "Coder": _pb.Coder{ - Spec: _pb.SdkFunctionSpec{ - Spec: _pb.FunctionSpec{ - Urn: "urn:beam:coders:global_window:0.1", - }, - }, - }, - }, + Coders: c.Build(), }, Root: _pb.MessageWithComponents_WindowingStrategy{ - WindowingStrategy: _pb.WindowingStrategy{ - WindowFn: _pb.SdkFunctionSpec{ - Spec: _pb.FunctionSpec{ - Urn: "beam:windowfn:global_windows:v0.1", - }, - }, - MergeStatus: rnapi_pb.MergeStatus_NON_MERGING, - AccumulationMode: rnapi_pb.AccumulationMode_DISCARDING, - WindowCoderId:"Coder", - Trigger: _pb.Trigger{ - Trigger: _pb.Trigger_Default_{ - Default: _pb.Trigger_Default{}, - }, - }, - OutputTime: rnapi_pb.OutputTime_END_OF_WINDOW, - ClosingBehavior: rnapi_pb.ClosingBehavior_EMIT_IF_NONEMPTY, - AllowedLateness: 0, - }, + WindowingStrategy: ws, }, } - - return + return msg } func encodeSerializedFn(in proto.Message) (string, error) { - // The Beam Runner API uses URL query escaping for serialized fn messages. - return protox.EncodeQueryEscaped(in) + // The Beam Runner API uses special escaping for serialized fn messages. + + data, err := proto.Marshal(in) + if err != nil { + return "", err + } + return encodeString(data), nil +} + +// encodeString is a custom encoding used in some cases by Dataflow. +// +// Uses a simple strategy of converting each byte to a single char, Review comment: Seems you're right. Good catch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95754) Time Spent: 5h 20m (was: 5h 10m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95752=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95752 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 26/Apr/18 22:10 Start Date: 26/Apr/18 22:10 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184545647 ## File path: sdks/go/pkg/beam/core/runtime/graphx/serialize.go ## @@ -942,28 +982,28 @@ func isCoGBKList(ref *CoderRef) ([]*CoderRef, bool) { return ref2.Components, true } -// TODO(wcn): Windowing information isn't currently propagated through -// our code. These methods will be used by other packages, so exporting -// them now. - // encodeWindow translates the preprocessed representation of a Beam coder // into the wire representation, capturing the underlying types used by // the coder. -func encodeWindow(w *window.Window) (*CoderRef, error) { - switch w.Kind() { - case window.GlobalWindow: +func encodeWindowCoder(w *coder.WindowCoder) (*CoderRef, error) { + switch w.Kind { + case coder.GlobalWindow: return {Type: GlobalWindowType}, nil + case coder.IntervalWindow: + return {Type: IntervalWindowType}, nil default: - return nil, fmt.Errorf("bad window kind: %v", w.Kind()) + return nil, fmt.Errorf("bad window kind: %v", w.Kind) } } // decodeWindow receives the wire representation of a Beam coder, extracting Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95752) Time Spent: 5h 10m (was: 5h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95751=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95751 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 26/Apr/18 22:10 Start Date: 26/Apr/18 22:10 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184545496 ## File path: sdks/go/pkg/beam/core/runtime/graphx/coder.go ## @@ -75,15 +74,15 @@ type CoderUnmarshaller struct { models map[string]*pb.Coder coders map[string]*coder.Coder - windows map[string]*window.Window + windows map[string]*coder.WindowCoder Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95751) Time Spent: 5h (was: 4h 50m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95750=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95750 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 26/Apr/18 22:09 Start Date: 26/Apr/18 22:09 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184545328 ## File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go ## @@ -95,26 +96,56 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm FullValue, values ...ReS } ctx = metrics.SetPTransformID(ctx, n.PID) + fn := n.Fn.ProcessElementFn() - val, err := n.invokeDataFn(ctx, elm.Timestamp, n.Fn.ProcessElementFn(), {Key: elm, Values: values}) - if err != nil { - return n.fail(err) - } + // If the function observes windows, we must pass invoke it for each window. The expected fast path Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95750) Time Spent: 4h 50m (was: 4h 40m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95748=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95748 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 26/Apr/18 22:08 Start Date: 26/Apr/18 22:08 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184545205 ## File path: sdks/go/pkg/beam/core/runtime/exec/coder.go ## @@ -276,42 +277,135 @@ func (c *kvDecoder) Decode(r io.Reader) (FullValue, error) { } -// TODO(herohde) 4/7/2017: actually handle windows. +// WindowEncoder handles Window serialization to a byte stream. The encoder +// can be reused, even if an error is encountered. Review comment: Added comment to all coding types that they are concurrency-safe. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95748) Time Spent: 4h 40m (was: 4.5h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95745=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95745 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 26/Apr/18 22:05 Start Date: 26/Apr/18 22:05 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184544647 ## File path: sdks/go/pkg/beam/core/graph/mtime/time.go ## @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mtime contains a millisecond representation of time. The purpose +// of this representation is alignment with the Beam specification, where we +// need extreme values outside the range of time.Time for windowing. +package mtime + +import ( + "fmt" + "math" + "time" +) + +const ( + // MinTimestamp is the minimum value for any Beam timestamp. Often referred to + // as "-infinity". This value and MaxTimestamp are chosen so that their + // microseconds-since-epoch can be safely represented with an int64 and boundary + // values can be represented correctly with milli-second precision. Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95745) Time Spent: 4.5h (was: 4h 20m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95742=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95742 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 26/Apr/18 22:05 Start Date: 26/Apr/18 22:05 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184544512 ## File path: sdks/go/pkg/beam/core/graph/coder/coder.go ## @@ -146,9 +145,9 @@ type Coder struct { Kind Kind Ttypex.FullType - Components []*Coder // WindowedValue, KV, CoGBK - Custom *CustomCoder // Custom - Window *window.Window // WindowedValue + Components []*Coder // WindowedValue, KV, CoGBK + Custom *CustomCoder // Custom + Window *WindowCoder // WindowedValue Review comment: No. The comments indicate which fields are populated for each kind as a poor man's algebraic data types. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95742) Time Spent: 4h 20m (was: 4h 10m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95267=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95267 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 25/Apr/18 21:50 Start Date: 25/Apr/18 21:50 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184214639 ## File path: sdks/go/pkg/beam/runners/dataflow/translate.go ## @@ -454,50 +457,52 @@ func stepID(id int) string { return fmt.Sprintf("s%v", id) } -func translateWindow(w *window.Window) proto.Message { - // TODO: The only windowing strategy we support is the global window. - if w.Kind() != window.GlobalWindow { - panic(fmt.Sprintf("Unsupported window type supplied: %v", w)) - } - // We compute the fixed content of this message for use in workflows. - msg := rnapi_pb.MessageWithComponents{ +func translateWindowingStrategy(w *window.WindowingStrategy) proto.Message { + c := graphx.NewCoderMarshaller() + ws := graphx.MarshalWindowingStrategy(c, w) + + msg := _pb.MessageWithComponents{ Components: _pb.Components{ - Coders: map[string]*rnapi_pb.Coder{ - "Coder": _pb.Coder{ - Spec: _pb.SdkFunctionSpec{ - Spec: _pb.FunctionSpec{ - Urn: "urn:beam:coders:global_window:0.1", - }, - }, - }, - }, + Coders: c.Build(), }, Root: _pb.MessageWithComponents_WindowingStrategy{ - WindowingStrategy: _pb.WindowingStrategy{ - WindowFn: _pb.SdkFunctionSpec{ - Spec: _pb.FunctionSpec{ - Urn: "beam:windowfn:global_windows:v0.1", - }, - }, - MergeStatus: rnapi_pb.MergeStatus_NON_MERGING, - AccumulationMode: rnapi_pb.AccumulationMode_DISCARDING, - WindowCoderId:"Coder", - Trigger: _pb.Trigger{ - Trigger: _pb.Trigger_Default_{ - Default: _pb.Trigger_Default{}, - }, - }, - OutputTime: rnapi_pb.OutputTime_END_OF_WINDOW, - ClosingBehavior: rnapi_pb.ClosingBehavior_EMIT_IF_NONEMPTY, - AllowedLateness: 0, - }, + WindowingStrategy: ws, }, } - - return + return msg } func encodeSerializedFn(in proto.Message) (string, error) { - // The Beam Runner API uses URL query escaping for serialized fn messages. - return protox.EncodeQueryEscaped(in) + // The Beam Runner API uses special escaping for serialized fn messages. + + data, err := proto.Marshal(in) + if err != nil { + return "", err + } + return encodeString(data), nil +} + +// encodeString is a custom encoding used in some cases by Dataflow. +// +// Uses a simple strategy of converting each byte to a single char, Review comment: This is defined as 'percent encoding' or less accurately as 'URL encoding.' https://en.wikipedia.org/wiki/Percent-encoding Pretty sure this is identical to url.PathEscape, perhaps use that? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95267) Time Spent: 3h 50m (was: 3h 40m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95263=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95263 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 25/Apr/18 21:50 Start Date: 25/Apr/18 21:50 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184211558 ## File path: sdks/go/pkg/beam/core/runtime/graphx/coder.go ## @@ -75,15 +74,15 @@ type CoderUnmarshaller struct { models map[string]*pb.Coder coders map[string]*coder.Coder - windows map[string]*window.Window + windows map[string]*coder.WindowCoder Review comment: windowCoders? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95263) Time Spent: 3.5h (was: 3h 20m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95264=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95264 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 25/Apr/18 21:50 Start Date: 25/Apr/18 21:50 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184209550 ## File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go ## @@ -95,26 +96,56 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm FullValue, values ...ReS } ctx = metrics.SetPTransformID(ctx, n.PID) + fn := n.Fn.ProcessElementFn() - val, err := n.invokeDataFn(ctx, elm.Timestamp, n.Fn.ProcessElementFn(), {Key: elm, Values: values}) - if err != nil { - return n.fail(err) - } + // If the function observes windows, we must pass invoke it for each window. The expected fast path Review comment: Grammar in first sentence. Perhaps, ", we must invoke it for each window?" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95264) Time Spent: 3.5h (was: 3h 20m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95269 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 25/Apr/18 21:50 Start Date: 25/Apr/18 21:50 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184211886 ## File path: sdks/go/pkg/beam/core/runtime/graphx/serialize.go ## @@ -942,28 +982,28 @@ func isCoGBKList(ref *CoderRef) ([]*CoderRef, bool) { return ref2.Components, true } -// TODO(wcn): Windowing information isn't currently propagated through -// our code. These methods will be used by other packages, so exporting -// them now. - // encodeWindow translates the preprocessed representation of a Beam coder // into the wire representation, capturing the underlying types used by // the coder. -func encodeWindow(w *window.Window) (*CoderRef, error) { - switch w.Kind() { - case window.GlobalWindow: +func encodeWindowCoder(w *coder.WindowCoder) (*CoderRef, error) { + switch w.Kind { + case coder.GlobalWindow: return {Type: GlobalWindowType}, nil + case coder.IntervalWindow: + return {Type: IntervalWindowType}, nil default: - return nil, fmt.Errorf("bad window kind: %v", w.Kind()) + return nil, fmt.Errorf("bad window kind: %v", w.Kind) } } // decodeWindow receives the wire representation of a Beam coder, extracting Review comment: Fix name of function. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95269) Time Spent: 4h 10m (was: 4h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95266=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95266 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 25/Apr/18 21:50 Start Date: 25/Apr/18 21:50 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184205053 ## File path: sdks/go/pkg/beam/core/graph/mtime/time.go ## @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mtime contains a millisecond representation of time. The purpose +// of this representation is alignment with the Beam specification, where we +// need extreme values outside the range of time.Time for windowing. +package mtime + +import ( + "fmt" + "math" + "time" +) + +const ( + // MinTimestamp is the minimum value for any Beam timestamp. Often referred to + // as "-infinity". This value and MaxTimestamp are chosen so that their + // microseconds-since-epoch can be safely represented with an int64 and boundary + // values can be represented correctly with milli-second precision. Review comment: millisecond This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95266) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95265=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95265 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 25/Apr/18 21:50 Start Date: 25/Apr/18 21:50 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184207512 ## File path: sdks/go/pkg/beam/core/runtime/exec/coder.go ## @@ -276,42 +277,135 @@ func (c *kvDecoder) Decode(r io.Reader) (FullValue, error) { } -// TODO(herohde) 4/7/2017: actually handle windows. +// WindowEncoder handles Window serialization to a byte stream. The encoder +// can be reused, even if an error is encountered. Review comment: Any concurrency-safety requirements? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95265) Time Spent: 3h 40m (was: 3.5h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95268=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95268 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 25/Apr/18 21:50 Start Date: 25/Apr/18 21:50 Worklog Time Spent: 10m Work Description: wcn3 commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r184203269 ## File path: sdks/go/pkg/beam/core/graph/coder/coder.go ## @@ -146,9 +145,9 @@ type Coder struct { Kind Kind Ttypex.FullType - Components []*Coder // WindowedValue, KV, CoGBK - Custom *CustomCoder // Custom - Window *window.Window // WindowedValue + Components []*Coder // WindowedValue, KV, CoGBK + Custom *CustomCoder // Custom + Window *WindowCoder // WindowedValue Review comment: Comment needs updating? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95268) Time Spent: 4h (was: 3h 50m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95072=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95072 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 25/Apr/18 15:29 Start Date: 25/Apr/18 15:29 Worklog Time Spent: 10m Work Description: herohde commented on issue #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#issuecomment-384329458 R: @robertwb Hope this PR makes Go a real SDK in your eyes :) PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95072) Time Spent: 3h 20m (was: 3h 10m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93519=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93519 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183148579 ## File path: sdks/go/pkg/beam/core/graph/coder/time_test.go ## @@ -18,32 +18,25 @@ package coder import ( "bytes" "testing" - "time" - "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" ) func TestEncodeDecodeEventTime(t *testing.T) { tests := []struct { - timetime.Time - errExpected bool + time mtime.Time }{ - {time: time.Unix(0, 0)}, - {time: time.Unix(10, 0)}, - {time: time.Unix(1257894000, 0)}, - {time: time.Unix(0, 1257894)}, - {time: time.Time{}, errExpected: true}, + {time: mtime.ZeroTimestamp}, + {time: mtime.MinTimestamp}, + {time: mtime.MaxTimestamp}, + {time: mtime.Now()}, Review comment: :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93519) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93518=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93518 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183195159 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine.go ## @@ -93,51 +79,38 @@ func (n *Combine) ProcessElement(ctx context.Context, value FullValue, values .. return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status) } - if n.IsPerKey { - // For per-key combine, all processing can be done here. Note that - // we do not explicitly call merge, although it may be called implicitly - // when adding input. + // Note that we do not explicitly call merge, although it may + // be called implicitly when adding input. - a, err := n.newAccum(ctx, value.Elm) - if err != nil { - return n.fail(err) - } - first := true - - stream := values[0].Open() - for { - v, err := stream.Read() - if err != nil { - if err == io.EOF { - break - } - return n.fail(err) - } + a, err := n.newAccum(ctx, value.Elm) + if err != nil { + return n.fail(err) + } + first := true - a, err = n.addInput(ctx, a, value.Elm, v.Elm, value.Timestamp, first) - if err != nil { - return n.fail(err) + stream := values[0].Open() + for { + v, err := stream.Read() + if err != nil { + if err == io.EOF { + break } - first = false + return n.fail(err) } - stream.Close() - out, err := n.extract(ctx, a) + a, err = n.addInput(ctx, a, value.Elm, v.Elm, value.Timestamp, first) if err != nil { return n.fail(err) } - return n.Out.ProcessElement(ctx, FullValue{Elm: value.Elm, Elm2: out, Timestamp: value.Timestamp}) + first = false } + stream.Close() Review comment: Done The defer cost is not a concern. I think the idea was to not hold the steam open while processing the continuation. We can tighten it up if it becomes an issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93518) Time Spent: 2h 50m (was: 2h 40m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93516=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93516 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183195020 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine.go ## @@ -93,51 +79,38 @@ func (n *Combine) ProcessElement(ctx context.Context, value FullValue, values .. return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status) } - if n.IsPerKey { - // For per-key combine, all processing can be done here. Note that - // we do not explicitly call merge, although it may be called implicitly - // when adding input. + // Note that we do not explicitly call merge, although it may Review comment: It's referring to the lack of combiner lifting, where we precombine and then merge the results. We just add each input into one accumulator, for now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93516) Time Spent: 2h 40m (was: 2.5h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93522=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93522 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183195431 ## File path: sdks/go/pkg/beam/core/runtime/exec/window.go ## @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec + +import ( + "context" + "fmt" + "time" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/go/pkg/beam/core/typex" +) + +// WindowInto places each element in one or more windows. +type WindowInto struct { + UID UnitID + Fn *window.Fn + Out Node +} + +func (w *WindowInto) ID() UnitID { + return w.UID +} + +func (w *WindowInto) Up(ctx context.Context) error { + return nil +} + +func (w *WindowInto) StartBundle(ctx context.Context, id string, data DataManager) error { + return w.Out.StartBundle(ctx, id, data) +} + +func (w *WindowInto) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error { + windowed := FullValue{ + Windows: assignWindows(w.Fn, elm.Timestamp), + Timestamp: elm.Timestamp, + Elm: elm.Elm, + Elm2: elm.Elm2, + } + return w.Out.ProcessElement(ctx, windowed, values...) +} + +func assignWindows(wfn *window.Fn, ts typex.EventTime) []typex.Window { + switch wfn.Kind { + case window.GlobalWindows: + return window.SingleGlobalWindow + + case window.FixedWindows: + start := ts - (ts.Add(wfn.Size) % mtime.FromDuration(wfn.Size)) + end := mtime.Min(start.Add(wfn.Size), mtime.EndOfGlobalWindowTime.Add(time.Millisecond)) + return []typex.Window{window.IntervalWindow{Start: start, End: end}} + + case window.SlidingWindows: + var ret []typex.Window + + period := mtime.FromDuration(wfn.Period) + lastStart := ts - (ts.Add(wfn.Size) % period) + for start := lastStart; start > ts.Subtract(wfn.Size); start -= period { + ret = append(ret, window.IntervalWindow{Start: start, End: start.Add(wfn.Size)}) + } + return ret + + default: Review comment: Opened BEAM-4152. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93522) Time Spent: 3h 10m (was: 3h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93514=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93514 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183148461 ## File path: sdks/go/examples/windowed_wordcount/windowed_wordcount.go ## @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93514) Time Spent: 2.5h (was: 2h 20m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93513=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93513 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183194942 ## File path: sdks/go/pkg/beam/core/graph/window/windows.go ## @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "fmt" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/go/pkg/beam/core/typex" +) + +var ( + // SingleGlobalWindow is a slice of a single global window. Convenience value. + SingleGlobalWindow = []typex.Window{GlobalWindow{}} +) + +// GlobalWindow represents the singleton, global window. +type GlobalWindow struct{} + +func (GlobalWindow) MaxTimestamp() typex.EventTime { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93513) Time Spent: 2h 20m (was: 2h 10m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93520=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93520 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183195295 ## File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go ## @@ -95,26 +96,56 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm FullValue, values ...ReS } ctx = metrics.SetPTransformID(ctx, n.PID) + fn := n.Fn.ProcessElementFn() - val, err := n.invokeDataFn(ctx, elm.Timestamp, n.Fn.ProcessElementFn(), {Key: elm, Values: values}) - if err != nil { - return n.fail(err) - } + // If the function observes windows, we must pass invoke the it for each window. The expected fast path Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93520) Time Spent: 3h (was: 2h 50m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93511=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93511 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183194745 ## File path: sdks/go/pkg/beam/core/funcx/fn_test.go ## @@ -106,6 +117,12 @@ func TestNew(t *testing.T) { Fn: func(context.Context, context.Context, int) {}, Err: errContextParam, }, + { + Name: "errWindowParamPrecedence: after EventType", Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93511) Time Spent: 2h 10m (was: 2h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93515=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93515 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183195256 ## File path: sdks/go/pkg/beam/core/runtime/exec/fn.go ## @@ -46,17 +48,22 @@ func Invoke(ctx context.Context, fn *funcx.Fn, opt *MainInput, extra ...interfac if index, ok := fn.Context(); ok { args[index] = ctx } + if index, ok := fn.Window(); ok { + if len(ws) != 1 { + return nil, fmt.Errorf("DoFns that observe windows must be invoked with single window: %v", opt.Key.Windows) Review comment: Each element may be in multiple windows, but are processing only once if the windows are not observed. Sliding windows is an example that produce multiple windows for an element. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93515) Time Spent: 2h 40m (was: 2.5h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93521=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93521 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183194856 ## File path: sdks/go/pkg/beam/core/graph/mtime/time.go ## @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mtime contains a millisecond representation of time. The purpose +// of this representation is alignment with the Beam specification, where we +// need extreme values outside the range of time.Time for windowing. +package mtime + +import ( + "fmt" + "math" + "time" +) + +const ( + // MinTimestamp is the minimum value for any Beam timestamp. Often referred to + // as "-infinity". This value and MaxTimestamp are chosen so that their + // microseconds-since-epoch can be safely represented with an int64 and boundary + // values can be represented correctly with milli-second precision. + MinTimestamp Time = math.MinInt64 / 1000 + + // MaxTimestamp is the maximum value for any Beam timestamp. Often referred to + // as "-infinity". + MaxTimestamp Time = math.MaxInt64 / 1000 + + // EndOfGlobalWindowTime is the timestamp at the end of the global window. It + // is a day before the max timestamp. + EndOfGlobalWindowTime = MaxTimestamp - 24*60*60*1000 + + // ZeroTimestamp is the default zero value time. It corresponds to the unix epoch. + ZeroTimestamp Time = 0 +) + +// Time is the number of milli-seconds since the Unix epoch. The valid range of +// times is bounded by what can be represented a _micro_-seconds-since-epoch. +type Time int64 + +// Now returns the current time. +func Now() Time { + return FromTime(time.Now()) +} + +// FromMilliseconds returns a timestamp from a raw milliseconds-since-epoch value. +func FromMilliseconds(unixMilliseconds int64) Time { + return Normalize(Time(unixMilliseconds)) +} + +// FromDuration returns a timestamp from a time.Duration-since-epoch value. +func FromDuration(d time.Duration) Time { + return ZeroTimestamp.Add(d) +} + +// FromTime returns a milli-second precision timestamp from a time.Time. +func FromTime(t time.Time) Time { + return Normalize(Time(n2m(t.UnixNano( +} + +// Milliseconds returns the number of milli-seconds since the Unix epoch. +func (t Time) Milliseconds() int64 { + return int64(t) +} + +// Add returns the time plus the duration. +func (t Time) Add(d time.Duration) Time { + return Normalize(Time(int64(t) + n2m(d.Nanoseconds( +} + +// Subtract returns the time minus the duration. +func (t Time) Subtract(d time.Duration) Time { + return Normalize(Time(int64(t) - n2m(d.Nanoseconds( +} + +func (t Time) String() string { + switch t { + case MinTimestamp: + return "-inf" + case MaxTimestamp: + return "+inf" + case EndOfGlobalWindowTime: + return "glo" + default: + return fmt.Sprintf("%v", t.Milliseconds()) + } +} + +// Min returns the smallest (earliest) time. +func Min(a, b Time) Time { + if int64(a) < int64(b) { + return a + } else { + return b + } +} + +// Max returns the largest (latest) time. +func Max(a, b Time) Time { + if int64(a) < int64(b) { + return b + } else { + return a + } +} + +// Normalize ensures a Time is within [MinTimestamp,MaxTimestamp]. +func Normalize(t Time) Time { + return Min(Max(t, MinTimestamp), MaxTimestamp) +} + +func n2m(v int64) int64 { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93512=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93512 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183194837 ## File path: sdks/go/pkg/beam/core/graph/mtime/time.go ## @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mtime contains a millisecond representation of time. The purpose +// of this representation is alignment with the Beam specification, where we +// need extreme values outside the range of time.Time for windowing. +package mtime + +import ( + "fmt" + "math" + "time" +) + +const ( + // MinTimestamp is the minimum value for any Beam timestamp. Often referred to + // as "-infinity". This value and MaxTimestamp are chosen so that their + // microseconds-since-epoch can be safely represented with an int64 and boundary + // values can be represented correctly with milli-second precision. + MinTimestamp Time = math.MinInt64 / 1000 + + // MaxTimestamp is the maximum value for any Beam timestamp. Often referred to + // as "-infinity". Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93512) Time Spent: 2h 10m (was: 2h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93517=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93517 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 21/Apr/18 01:15 Start Date: 21/Apr/18 01:15 Worklog Time Spent: 10m Work Description: herohde commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r183195326 ## File path: sdks/go/pkg/beam/core/runtime/exec/translate.go ## @@ -68,15 +69,19 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) { } if cid == "" { - u.Coder, err = b.makeCoderForPCollection(pid) + c, wc, err := b.makeCoderForPCollection(pid) if err != nil { return nil, err } + u.Coder = coder.NewW(c, wc) } else { u.Coder, err = b.coders.Coder(cid) // Expected to be windowed coder if err != nil { return nil, err } + if !coder.IsW(u.Coder) { + return nil, fmt.Errorf("unwindowed coder %v on DataSource %v: %v", cid, id, u.Coder) Review comment: Runner bug. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93517) Time Spent: 2h 40m (was: 2.5h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92909=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92909 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910787 ## File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go ## @@ -95,26 +96,56 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm FullValue, values ...ReS } ctx = metrics.SetPTransformID(ctx, n.PID) + fn := n.Fn.ProcessElementFn() - val, err := n.invokeDataFn(ctx, elm.Timestamp, n.Fn.ProcessElementFn(), {Key: elm, Values: values}) - if err != nil { - return n.fail(err) - } + // If the function observes windows, we must pass invoke the it for each window. The expected fast path Review comment: s/the it/it/ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92909) Time Spent: 1h 10m (was: 1h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92914=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92914 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910798 ## File path: sdks/go/pkg/beam/core/funcx/fn_test.go ## @@ -106,6 +117,12 @@ func TestNew(t *testing.T) { Fn: func(context.Context, context.Context, int) {}, Err: errContextParam, }, + { + Name: "errWindowParamPrecedence: after EventType", Review comment: s/EventType/EventTime This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92914) Time Spent: 2h (was: 1h 50m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92903=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92903 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910777 ## File path: sdks/go/pkg/beam/core/runtime/exec/translate.go ## @@ -68,15 +69,19 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) { } if cid == "" { - u.Coder, err = b.makeCoderForPCollection(pid) + c, wc, err := b.makeCoderForPCollection(pid) if err != nil { return nil, err } + u.Coder = coder.NewW(c, wc) } else { u.Coder, err = b.coders.Coder(cid) // Expected to be windowed coder if err != nil { return nil, err } + if !coder.IsW(u.Coder) { + return nil, fmt.Errorf("unwindowed coder %v on DataSource %v: %v", cid, id, u.Coder) Review comment: When this error occurs, what's the fix? Is it indicative of an SDK bug or a pipeline author bug or an io author bug? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92903) Time Spent: 0.5h (was: 20m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92907=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92907 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910780 ## File path: sdks/go/pkg/beam/core/graph/window/windows.go ## @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package window + +import ( + "fmt" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/go/pkg/beam/core/typex" +) + +var ( + // SingleGlobalWindow is a slice of a single global window. Convenience value. + SingleGlobalWindow = []typex.Window{GlobalWindow{}} +) + +// GlobalWindow represents the singleton, global window. +type GlobalWindow struct{} + +func (GlobalWindow) MaxTimestamp() typex.EventTime { Review comment: Top level Exported declarations (even methods to satisfy an interface) should have a comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92907) Time Spent: 1h (was: 50m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92911=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92911 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910790 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine.go ## @@ -93,51 +79,38 @@ func (n *Combine) ProcessElement(ctx context.Context, value FullValue, values .. return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status) } - if n.IsPerKey { - // For per-key combine, all processing can be done here. Note that - // we do not explicitly call merge, although it may be called implicitly - // when adding input. + // Note that we do not explicitly call merge, although it may Review comment: Is this referring to the non-initial entries in values? We don't appear to do anything with them. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92911) Time Spent: 1.5h (was: 1h 20m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92904=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92904 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910779 ## File path: sdks/go/examples/windowed_wordcount/windowed_wordcount.go ## @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + Review comment: Consider adding a package doc explaining the example. // Windowed_wordcount demonstrates using Windowing with the Go SDK. // Windowing is a concept in the Beam model that subdivides a PCollection based on the // EventTime of an element. // See the Beam Programming guide at https://beam.apache.org/documentation/programming-guide/#windowing for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92904) Time Spent: 40m (was: 0.5h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92908=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92908 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910775 ## File path: sdks/go/pkg/beam/core/graph/mtime/time.go ## @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mtime contains a millisecond representation of time. The purpose +// of this representation is alignment with the Beam specification, where we +// need extreme values outside the range of time.Time for windowing. +package mtime + +import ( + "fmt" + "math" + "time" +) + +const ( + // MinTimestamp is the minimum value for any Beam timestamp. Often referred to + // as "-infinity". This value and MaxTimestamp are chosen so that their + // microseconds-since-epoch can be safely represented with an int64 and boundary + // values can be represented correctly with milli-second precision. + MinTimestamp Time = math.MinInt64 / 1000 + + // MaxTimestamp is the maximum value for any Beam timestamp. Often referred to + // as "-infinity". Review comment: "infinity" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92908) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92913=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92913 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910794 ## File path: sdks/go/pkg/beam/core/runtime/exec/fn.go ## @@ -46,17 +48,22 @@ func Invoke(ctx context.Context, fn *funcx.Fn, opt *MainInput, extra ...interfac if index, ok := fn.Context(); ok { args[index] = ctx } + if index, ok := fn.Window(); ok { + if len(ws) != 1 { + return nil, fmt.Errorf("DoFns that observe windows must be invoked with single window: %v", opt.Key.Windows) Review comment: When would there be two windows on a PCollection? Simply by applying WindowInto twice, the second being a finer grain than the first? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92913) Time Spent: 1h 50m (was: 1h 40m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92910=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92910 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910789 ## File path: sdks/go/pkg/beam/core/runtime/exec/combine.go ## @@ -93,51 +79,38 @@ func (n *Combine) ProcessElement(ctx context.Context, value FullValue, values .. return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status) } - if n.IsPerKey { - // For per-key combine, all processing can be done here. Note that - // we do not explicitly call merge, although it may be called implicitly - // when adding input. + // Note that we do not explicitly call merge, although it may + // be called implicitly when adding input. - a, err := n.newAccum(ctx, value.Elm) - if err != nil { - return n.fail(err) - } - first := true - - stream := values[0].Open() - for { - v, err := stream.Read() - if err != nil { - if err == io.EOF { - break - } - return n.fail(err) - } + a, err := n.newAccum(ctx, value.Elm) + if err != nil { + return n.fail(err) + } + first := true - a, err = n.addInput(ctx, a, value.Elm, v.Elm, value.Timestamp, first) - if err != nil { - return n.fail(err) + stream := values[0].Open() + for { + v, err := stream.Read() + if err != nil { + if err == io.EOF { + break } - first = false + return n.fail(err) } - stream.Close() - out, err := n.extract(ctx, a) + a, err = n.addInput(ctx, a, value.Elm, v.Elm, value.Timestamp, first) if err != nil { return n.fail(err) } - return n.Out.ProcessElement(ctx, FullValue{Elm: value.Elm, Elm2: out, Timestamp: value.Timestamp}) + first = false } + stream.Close() Review comment: This behaviour was present previously, but consider defering the close after the Open call, since there are early returns, the stream may be left open when there's an a panic. That said, if none of the other calls can panic, consider explicitly closing before each early return, if the defer cost is a concern. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92910) Time Spent: 1h 20m (was: 1h 10m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92912=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92912 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910796 ## File path: sdks/go/pkg/beam/core/graph/coder/time_test.go ## @@ -18,32 +18,25 @@ package coder import ( "bytes" "testing" - "time" - "github.com/apache/beam/sdks/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" ) func TestEncodeDecodeEventTime(t *testing.T) { tests := []struct { - timetime.Time - errExpected bool + time mtime.Time }{ - {time: time.Unix(0, 0)}, - {time: time.Unix(10, 0)}, - {time: time.Unix(1257894000, 0)}, - {time: time.Unix(0, 1257894)}, - {time: time.Time{}, errExpected: true}, + {time: mtime.ZeroTimestamp}, + {time: mtime.MinTimestamp}, + {time: mtime.MaxTimestamp}, + {time: mtime.Now()}, Review comment: While *Now* is admirable to add as a test, it tells you too late that there's a problem. Consider Now() + a month instead, which at least gives advanced notice. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92912) Time Spent: 1h 40m (was: 1.5h) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92905=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92905 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910781 ## File path: sdks/go/pkg/beam/core/graph/mtime/time.go ## @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mtime contains a millisecond representation of time. The purpose +// of this representation is alignment with the Beam specification, where we +// need extreme values outside the range of time.Time for windowing. +package mtime + +import ( + "fmt" + "math" + "time" +) + +const ( + // MinTimestamp is the minimum value for any Beam timestamp. Often referred to + // as "-infinity". This value and MaxTimestamp are chosen so that their + // microseconds-since-epoch can be safely represented with an int64 and boundary + // values can be represented correctly with milli-second precision. + MinTimestamp Time = math.MinInt64 / 1000 + + // MaxTimestamp is the maximum value for any Beam timestamp. Often referred to + // as "-infinity". + MaxTimestamp Time = math.MaxInt64 / 1000 + + // EndOfGlobalWindowTime is the timestamp at the end of the global window. It + // is a day before the max timestamp. + EndOfGlobalWindowTime = MaxTimestamp - 24*60*60*1000 + + // ZeroTimestamp is the default zero value time. It corresponds to the unix epoch. + ZeroTimestamp Time = 0 +) + +// Time is the number of milli-seconds since the Unix epoch. The valid range of +// times is bounded by what can be represented a _micro_-seconds-since-epoch. +type Time int64 + +// Now returns the current time. +func Now() Time { + return FromTime(time.Now()) +} + +// FromMilliseconds returns a timestamp from a raw milliseconds-since-epoch value. +func FromMilliseconds(unixMilliseconds int64) Time { + return Normalize(Time(unixMilliseconds)) +} + +// FromDuration returns a timestamp from a time.Duration-since-epoch value. +func FromDuration(d time.Duration) Time { + return ZeroTimestamp.Add(d) +} + +// FromTime returns a milli-second precision timestamp from a time.Time. +func FromTime(t time.Time) Time { + return Normalize(Time(n2m(t.UnixNano( +} + +// Milliseconds returns the number of milli-seconds since the Unix epoch. +func (t Time) Milliseconds() int64 { + return int64(t) +} + +// Add returns the time plus the duration. +func (t Time) Add(d time.Duration) Time { + return Normalize(Time(int64(t) + n2m(d.Nanoseconds( +} + +// Subtract returns the time minus the duration. +func (t Time) Subtract(d time.Duration) Time { + return Normalize(Time(int64(t) - n2m(d.Nanoseconds( +} + +func (t Time) String() string { + switch t { + case MinTimestamp: + return "-inf" + case MaxTimestamp: + return "+inf" + case EndOfGlobalWindowTime: + return "glo" + default: + return fmt.Sprintf("%v", t.Milliseconds()) + } +} + +// Min returns the smallest (earliest) time. +func Min(a, b Time) Time { + if int64(a) < int64(b) { + return a + } else { + return b + } +} + +// Max returns the largest (latest) time. +func Max(a, b Time) Time { + if int64(a) < int64(b) { + return b + } else { + return a + } +} + +// Normalize ensures a Time is within [MinTimestamp,MaxTimestamp]. +func Normalize(t Time) Time { + return Min(Max(t, MinTimestamp), MaxTimestamp) +} + +func n2m(v int64) int64 { Review comment: Consider: // n2m converts nanoseconds to milliseconds. This is an automated message from
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92906=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92906 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 23:18 Start Date: 19/Apr/18 23:18 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#discussion_r182910776 ## File path: sdks/go/pkg/beam/core/runtime/exec/window.go ## @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec + +import ( + "context" + "fmt" + "time" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/go/pkg/beam/core/typex" +) + +// WindowInto places each element in one or more windows. +type WindowInto struct { + UID UnitID + Fn *window.Fn + Out Node +} + +func (w *WindowInto) ID() UnitID { + return w.UID +} + +func (w *WindowInto) Up(ctx context.Context) error { + return nil +} + +func (w *WindowInto) StartBundle(ctx context.Context, id string, data DataManager) error { + return w.Out.StartBundle(ctx, id, data) +} + +func (w *WindowInto) ProcessElement(ctx context.Context, elm FullValue, values ...ReStream) error { + windowed := FullValue{ + Windows: assignWindows(w.Fn, elm.Timestamp), + Timestamp: elm.Timestamp, + Elm: elm.Elm, + Elm2: elm.Elm2, + } + return w.Out.ProcessElement(ctx, windowed, values...) +} + +func assignWindows(wfn *window.Fn, ts typex.EventTime) []typex.Window { + switch wfn.Kind { + case window.GlobalWindows: + return window.SingleGlobalWindow + + case window.FixedWindows: + start := ts - (ts.Add(wfn.Size) % mtime.FromDuration(wfn.Size)) + end := mtime.Min(start.Add(wfn.Size), mtime.EndOfGlobalWindowTime.Add(time.Millisecond)) + return []typex.Window{window.IntervalWindow{Start: start, End: end}} + + case window.SlidingWindows: + var ret []typex.Window + + period := mtime.FromDuration(wfn.Period) + lastStart := ts - (ts.Add(wfn.Size) % period) + for start := lastStart; start > ts.Subtract(wfn.Size); start -= period { + ret = append(ret, window.IntervalWindow{Start: start, End: start.Add(wfn.Size)}) + } + return ret + + default: Review comment: Is there a TODO /JIRA for the Session window support? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92906) Time Spent: 1h (was: 50m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92437=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92437 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 07:28 Start Date: 19/Apr/18 07:28 Worklog Time Spent: 10m Work Description: herohde commented on issue #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179#issuecomment-382638125 R: @wcn3 @lostluck Sorry for the massive PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92437) Time Spent: 20m (was: 10m) > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3303) Go windowing support
[ https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92436=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92436 ] ASF GitHub Bot logged work on BEAM-3303: Author: ASF GitHub Bot Created on: 19/Apr/18 07:25 Start Date: 19/Apr/18 07:25 Worklog Time Spent: 10m Work Description: herohde opened a new pull request #5179: [BEAM-3303] Add Go Windowing support URL: https://github.com/apache/beam/pull/5179 * Added WindowInto and supporting types and handling. Made EventTime align with Beam * Added windowed_wordcount. Runs on direct + dataflow. Flink doesn't seem to support the urn yet. * Fixed Dataflow encoding issue and made it use model pipeline references for ParDo [BEAM-3448]. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 92436) Time Spent: 10m Remaining Estimate: 0h > Go windowing support > > > Key: BEAM-3303 > URL: https://issues.apache.org/jira/browse/BEAM-3303 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Henning Rohde >Assignee: Henning Rohde >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > Add support for Window.into and windowing strategies on Node. Implement the > various windowing strategies Beam has: GlobalWindow, FixedWindows, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)