[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-27 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=96280=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96280
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 27/Apr/18 23:39
Start Date: 27/Apr/18 23:39
Worklog Time Spent: 10m 
  Work Description: herohde commented on issue #5179: [BEAM-3303] Add Go 
Windowing support
URL: https://github.com/apache/beam/pull/5179#issuecomment-385119592
 
 
   Thanks. Yes, the idea is add a custom windowFn later (similarly to how 
custom coders are done). My understanding is that custom WindowFns are not yet 
supported in the FnAPI, though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96280)
Time Spent: 6h 10m  (was: 6h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-27 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=96278=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96278
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 27/Apr/18 23:35
Start Date: 27/Apr/18 23:35
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #5179: [BEAM-3303] Add Go 
Windowing support
URL: https://github.com/apache/beam/pull/5179#issuecomment-385119208
 
 
   Thanks!
   
   It looks like this structure only supports a finite set of builtin 
WindowFns, rather than letting the user define their own with a callable 
(WindowFn:WindowInto is like DoFn:ParDo), but that can be future work. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 96278)
Time Spent: 5h 50m  (was: 5h 40m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95799=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95799
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 27/Apr/18 00:19
Start Date: 27/Apr/18 00:19
Worklog Time Spent: 10m 
  Work Description: herohde commented on issue #5179: [BEAM-3303] Add Go 
Windowing support
URL: https://github.com/apache/beam/pull/5179#issuecomment-384826776
 
 
   @robertwb Now I think it's ready. PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95799)
Time Spent: 5h 40m  (was: 5.5h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95757=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95757
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 26/Apr/18 22:33
Start Date: 26/Apr/18 22:33
Worklog Time Spent: 10m 
  Work Description: herohde commented on issue #5179: [BEAM-3303] Add Go 
Windowing support
URL: https://github.com/apache/beam/pull/5179#issuecomment-384809351
 
 
   Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95757)
Time Spent: 5.5h  (was: 5h 20m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95754=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95754
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 26/Apr/18 22:29
Start Date: 26/Apr/18 22:29
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184548981
 
 

 ##
 File path: sdks/go/pkg/beam/runners/dataflow/translate.go
 ##
 @@ -454,50 +457,52 @@ func stepID(id int) string {
return fmt.Sprintf("s%v", id)
 }
 
-func translateWindow(w *window.Window) proto.Message {
-   // TODO: The only windowing strategy we support is the global window.
-   if w.Kind() != window.GlobalWindow {
-   panic(fmt.Sprintf("Unsupported window type supplied: %v", w))
-   }
-   // We compute the fixed content of this message for use in workflows.
-   msg := rnapi_pb.MessageWithComponents{
+func translateWindowingStrategy(w *window.WindowingStrategy) proto.Message {
+   c := graphx.NewCoderMarshaller()
+   ws := graphx.MarshalWindowingStrategy(c, w)
+
+   msg := _pb.MessageWithComponents{
Components: _pb.Components{
-   Coders: map[string]*rnapi_pb.Coder{
-   "Coder": _pb.Coder{
-   Spec: _pb.SdkFunctionSpec{
-   Spec: _pb.FunctionSpec{
-   Urn: 
"urn:beam:coders:global_window:0.1",
-   },
-   },
-   },
-   },
+   Coders: c.Build(),
},
Root: _pb.MessageWithComponents_WindowingStrategy{
-   WindowingStrategy: _pb.WindowingStrategy{
-   WindowFn: _pb.SdkFunctionSpec{
-   Spec: _pb.FunctionSpec{
-   Urn: 
"beam:windowfn:global_windows:v0.1",
-   },
-   },
-   MergeStatus:  
rnapi_pb.MergeStatus_NON_MERGING,
-   AccumulationMode: 
rnapi_pb.AccumulationMode_DISCARDING,
-   WindowCoderId:"Coder",
-   Trigger: _pb.Trigger{
-   Trigger: _pb.Trigger_Default_{
-   Default: 
_pb.Trigger_Default{},
-   },
-   },
-   OutputTime:  
rnapi_pb.OutputTime_END_OF_WINDOW,
-   ClosingBehavior: 
rnapi_pb.ClosingBehavior_EMIT_IF_NONEMPTY,
-   AllowedLateness: 0,
-   },
+   WindowingStrategy: ws,
},
}
-
-   return 
+   return msg
 }
 
 func encodeSerializedFn(in proto.Message) (string, error) {
-   // The Beam Runner API uses URL query escaping for serialized fn 
messages.
-   return protox.EncodeQueryEscaped(in)
+   // The Beam Runner API uses special escaping for serialized fn messages.
+
+   data, err := proto.Marshal(in)
+   if err != nil {
+   return "", err
+   }
+   return encodeString(data), nil
+}
+
+// encodeString is a custom encoding used in some cases by Dataflow.
+//
+// Uses a simple strategy of converting each byte to a single char,
 
 Review comment:
   Seems you're right. Good catch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95754)
Time Spent: 5h 20m  (was: 5h 10m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95752=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95752
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 26/Apr/18 22:10
Start Date: 26/Apr/18 22:10
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184545647
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/graphx/serialize.go
 ##
 @@ -942,28 +982,28 @@ func isCoGBKList(ref *CoderRef) ([]*CoderRef, bool) {
return ref2.Components, true
 }
 
-// TODO(wcn): Windowing information isn't currently propagated through
-// our code. These methods will be used by other packages, so exporting
-// them now.
-
 // encodeWindow translates the preprocessed representation of a Beam coder
 // into the wire representation, capturing the underlying types used by
 // the coder.
-func encodeWindow(w *window.Window) (*CoderRef, error) {
-   switch w.Kind() {
-   case window.GlobalWindow:
+func encodeWindowCoder(w *coder.WindowCoder) (*CoderRef, error) {
+   switch w.Kind {
+   case coder.GlobalWindow:
return {Type: GlobalWindowType}, nil
+   case coder.IntervalWindow:
+   return {Type: IntervalWindowType}, nil
default:
-   return nil, fmt.Errorf("bad window kind: %v", w.Kind())
+   return nil, fmt.Errorf("bad window kind: %v", w.Kind)
}
 }
 
 // decodeWindow receives the wire representation of a Beam coder, extracting
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95752)
Time Spent: 5h 10m  (was: 5h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95751=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95751
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 26/Apr/18 22:10
Start Date: 26/Apr/18 22:10
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184545496
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/graphx/coder.go
 ##
 @@ -75,15 +74,15 @@ type CoderUnmarshaller struct {
models map[string]*pb.Coder
 
coders  map[string]*coder.Coder
-   windows map[string]*window.Window
+   windows map[string]*coder.WindowCoder
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95751)
Time Spent: 5h  (was: 4h 50m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95750=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95750
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 26/Apr/18 22:09
Start Date: 26/Apr/18 22:09
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184545328
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
 ##
 @@ -95,26 +96,56 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm 
FullValue, values ...ReS
}
 
ctx = metrics.SetPTransformID(ctx, n.PID)
+   fn := n.Fn.ProcessElementFn()
 
-   val, err := n.invokeDataFn(ctx, elm.Timestamp, n.Fn.ProcessElementFn(), 
{Key: elm, Values: values})
-   if err != nil {
-   return n.fail(err)
-   }
+   // If the function observes windows, we must pass invoke it for each 
window. The expected fast path
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95750)
Time Spent: 4h 50m  (was: 4h 40m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95748=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95748
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 26/Apr/18 22:08
Start Date: 26/Apr/18 22:08
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184545205
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
 ##
 @@ -276,42 +277,135 @@ func (c *kvDecoder) Decode(r io.Reader) (FullValue, 
error) {
 
 }
 
-// TODO(herohde) 4/7/2017: actually handle windows.
+// WindowEncoder handles Window serialization to a byte stream. The encoder
+// can be reused, even if an error is encountered.
 
 Review comment:
   Added comment to all coding types that they are concurrency-safe.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95748)
Time Spent: 4h 40m  (was: 4.5h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95745=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95745
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 26/Apr/18 22:05
Start Date: 26/Apr/18 22:05
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184544647
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/mtime/time.go
 ##
 @@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package mtime contains a millisecond representation of time. The purpose
+// of this representation is alignment with the Beam specification, where we
+// need extreme values outside the range of time.Time for windowing.
+package mtime
+
+import (
+   "fmt"
+   "math"
+   "time"
+)
+
+const (
+   // MinTimestamp is the minimum value for any Beam timestamp. Often 
referred to
+   // as "-infinity". This value and MaxTimestamp are chosen so that their
+   // microseconds-since-epoch can be safely represented with an int64 and 
boundary
+   // values can be represented correctly with milli-second precision.
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95745)
Time Spent: 4.5h  (was: 4h 20m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95742=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95742
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 26/Apr/18 22:05
Start Date: 26/Apr/18 22:05
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184544512
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/coder/coder.go
 ##
 @@ -146,9 +145,9 @@ type Coder struct {
Kind Kind
Ttypex.FullType
 
-   Components []*Coder   // WindowedValue, KV, CoGBK
-   Custom *CustomCoder   // Custom
-   Window *window.Window // WindowedValue
+   Components []*Coder // WindowedValue, KV, CoGBK
+   Custom *CustomCoder // Custom
+   Window *WindowCoder // WindowedValue
 
 Review comment:
   No. The comments indicate which fields are populated for each kind as a poor 
man's algebraic data types.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95742)
Time Spent: 4h 20m  (was: 4h 10m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95267=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95267
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 25/Apr/18 21:50
Start Date: 25/Apr/18 21:50
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184214639
 
 

 ##
 File path: sdks/go/pkg/beam/runners/dataflow/translate.go
 ##
 @@ -454,50 +457,52 @@ func stepID(id int) string {
return fmt.Sprintf("s%v", id)
 }
 
-func translateWindow(w *window.Window) proto.Message {
-   // TODO: The only windowing strategy we support is the global window.
-   if w.Kind() != window.GlobalWindow {
-   panic(fmt.Sprintf("Unsupported window type supplied: %v", w))
-   }
-   // We compute the fixed content of this message for use in workflows.
-   msg := rnapi_pb.MessageWithComponents{
+func translateWindowingStrategy(w *window.WindowingStrategy) proto.Message {
+   c := graphx.NewCoderMarshaller()
+   ws := graphx.MarshalWindowingStrategy(c, w)
+
+   msg := _pb.MessageWithComponents{
Components: _pb.Components{
-   Coders: map[string]*rnapi_pb.Coder{
-   "Coder": _pb.Coder{
-   Spec: _pb.SdkFunctionSpec{
-   Spec: _pb.FunctionSpec{
-   Urn: 
"urn:beam:coders:global_window:0.1",
-   },
-   },
-   },
-   },
+   Coders: c.Build(),
},
Root: _pb.MessageWithComponents_WindowingStrategy{
-   WindowingStrategy: _pb.WindowingStrategy{
-   WindowFn: _pb.SdkFunctionSpec{
-   Spec: _pb.FunctionSpec{
-   Urn: 
"beam:windowfn:global_windows:v0.1",
-   },
-   },
-   MergeStatus:  
rnapi_pb.MergeStatus_NON_MERGING,
-   AccumulationMode: 
rnapi_pb.AccumulationMode_DISCARDING,
-   WindowCoderId:"Coder",
-   Trigger: _pb.Trigger{
-   Trigger: _pb.Trigger_Default_{
-   Default: 
_pb.Trigger_Default{},
-   },
-   },
-   OutputTime:  
rnapi_pb.OutputTime_END_OF_WINDOW,
-   ClosingBehavior: 
rnapi_pb.ClosingBehavior_EMIT_IF_NONEMPTY,
-   AllowedLateness: 0,
-   },
+   WindowingStrategy: ws,
},
}
-
-   return 
+   return msg
 }
 
 func encodeSerializedFn(in proto.Message) (string, error) {
-   // The Beam Runner API uses URL query escaping for serialized fn 
messages.
-   return protox.EncodeQueryEscaped(in)
+   // The Beam Runner API uses special escaping for serialized fn messages.
+
+   data, err := proto.Marshal(in)
+   if err != nil {
+   return "", err
+   }
+   return encodeString(data), nil
+}
+
+// encodeString is a custom encoding used in some cases by Dataflow.
+//
+// Uses a simple strategy of converting each byte to a single char,
 
 Review comment:
   This is defined as 'percent encoding' or less accurately as 'URL encoding.' 
   https://en.wikipedia.org/wiki/Percent-encoding
   
   Pretty sure this is identical to url.PathEscape, perhaps use that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95267)
Time Spent: 3h 50m  (was: 3h 40m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 

[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95263=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95263
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 25/Apr/18 21:50
Start Date: 25/Apr/18 21:50
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184211558
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/graphx/coder.go
 ##
 @@ -75,15 +74,15 @@ type CoderUnmarshaller struct {
models map[string]*pb.Coder
 
coders  map[string]*coder.Coder
-   windows map[string]*window.Window
+   windows map[string]*coder.WindowCoder
 
 Review comment:
   windowCoders?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95263)
Time Spent: 3.5h  (was: 3h 20m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95264=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95264
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 25/Apr/18 21:50
Start Date: 25/Apr/18 21:50
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184209550
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
 ##
 @@ -95,26 +96,56 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm 
FullValue, values ...ReS
}
 
ctx = metrics.SetPTransformID(ctx, n.PID)
+   fn := n.Fn.ProcessElementFn()
 
-   val, err := n.invokeDataFn(ctx, elm.Timestamp, n.Fn.ProcessElementFn(), 
{Key: elm, Values: values})
-   if err != nil {
-   return n.fail(err)
-   }
+   // If the function observes windows, we must pass invoke it for each 
window. The expected fast path
 
 Review comment:
   Grammar in first sentence. Perhaps, ", we must invoke it for each window?"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95264)
Time Spent: 3.5h  (was: 3h 20m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95269
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 25/Apr/18 21:50
Start Date: 25/Apr/18 21:50
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184211886
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/graphx/serialize.go
 ##
 @@ -942,28 +982,28 @@ func isCoGBKList(ref *CoderRef) ([]*CoderRef, bool) {
return ref2.Components, true
 }
 
-// TODO(wcn): Windowing information isn't currently propagated through
-// our code. These methods will be used by other packages, so exporting
-// them now.
-
 // encodeWindow translates the preprocessed representation of a Beam coder
 // into the wire representation, capturing the underlying types used by
 // the coder.
-func encodeWindow(w *window.Window) (*CoderRef, error) {
-   switch w.Kind() {
-   case window.GlobalWindow:
+func encodeWindowCoder(w *coder.WindowCoder) (*CoderRef, error) {
+   switch w.Kind {
+   case coder.GlobalWindow:
return {Type: GlobalWindowType}, nil
+   case coder.IntervalWindow:
+   return {Type: IntervalWindowType}, nil
default:
-   return nil, fmt.Errorf("bad window kind: %v", w.Kind())
+   return nil, fmt.Errorf("bad window kind: %v", w.Kind)
}
 }
 
 // decodeWindow receives the wire representation of a Beam coder, extracting
 
 Review comment:
   Fix name of function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95269)
Time Spent: 4h 10m  (was: 4h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95266=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95266
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 25/Apr/18 21:50
Start Date: 25/Apr/18 21:50
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184205053
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/mtime/time.go
 ##
 @@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package mtime contains a millisecond representation of time. The purpose
+// of this representation is alignment with the Beam specification, where we
+// need extreme values outside the range of time.Time for windowing.
+package mtime
+
+import (
+   "fmt"
+   "math"
+   "time"
+)
+
+const (
+   // MinTimestamp is the minimum value for any Beam timestamp. Often 
referred to
+   // as "-infinity". This value and MaxTimestamp are chosen so that their
+   // microseconds-since-epoch can be safely represented with an int64 and 
boundary
+   // values can be represented correctly with milli-second precision.
 
 Review comment:
   millisecond


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95266)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95265=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95265
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 25/Apr/18 21:50
Start Date: 25/Apr/18 21:50
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184207512
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
 ##
 @@ -276,42 +277,135 @@ func (c *kvDecoder) Decode(r io.Reader) (FullValue, 
error) {
 
 }
 
-// TODO(herohde) 4/7/2017: actually handle windows.
+// WindowEncoder handles Window serialization to a byte stream. The encoder
+// can be reused, even if an error is encountered.
 
 Review comment:
   Any concurrency-safety requirements?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95265)
Time Spent: 3h 40m  (was: 3.5h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95268=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95268
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 25/Apr/18 21:50
Start Date: 25/Apr/18 21:50
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r184203269
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/coder/coder.go
 ##
 @@ -146,9 +145,9 @@ type Coder struct {
Kind Kind
Ttypex.FullType
 
-   Components []*Coder   // WindowedValue, KV, CoGBK
-   Custom *CustomCoder   // Custom
-   Window *window.Window // WindowedValue
+   Components []*Coder // WindowedValue, KV, CoGBK
+   Custom *CustomCoder // Custom
+   Window *WindowCoder // WindowedValue
 
 Review comment:
   Comment needs updating?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95268)
Time Spent: 4h  (was: 3h 50m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-25 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=95072=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95072
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 25/Apr/18 15:29
Start Date: 25/Apr/18 15:29
Worklog Time Spent: 10m 
  Work Description: herohde commented on issue #5179: [BEAM-3303] Add Go 
Windowing support
URL: https://github.com/apache/beam/pull/5179#issuecomment-384329458
 
 
   R: @robertwb Hope this PR makes Go a real SDK in your eyes :) PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 95072)
Time Spent: 3h 20m  (was: 3h 10m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93519=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93519
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183148579
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/coder/time_test.go
 ##
 @@ -18,32 +18,25 @@ package coder
 import (
"bytes"
"testing"
-   "time"
 
-   "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
 )
 
 func TestEncodeDecodeEventTime(t *testing.T) {
tests := []struct {
-   timetime.Time
-   errExpected bool
+   time mtime.Time
}{
-   {time: time.Unix(0, 0)},
-   {time: time.Unix(10, 0)},
-   {time: time.Unix(1257894000, 0)},
-   {time: time.Unix(0, 1257894)},
-   {time: time.Time{}, errExpected: true},
+   {time: mtime.ZeroTimestamp},
+   {time: mtime.MinTimestamp},
+   {time: mtime.MaxTimestamp},
+   {time: mtime.Now()},
 
 Review comment:
   :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93519)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93518=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93518
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183195159
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/combine.go
 ##
 @@ -93,51 +79,38 @@ func (n *Combine) ProcessElement(ctx context.Context, 
value FullValue, values ..
return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
}
 
-   if n.IsPerKey {
-   // For per-key combine, all processing can be done here. Note 
that
-   // we do not explicitly call merge, although it may be called 
implicitly
-   // when adding input.
+   // Note that we do not explicitly call merge, although it may
+   // be called implicitly when adding input.
 
-   a, err := n.newAccum(ctx, value.Elm)
-   if err != nil {
-   return n.fail(err)
-   }
-   first := true
-
-   stream := values[0].Open()
-   for {
-   v, err := stream.Read()
-   if err != nil {
-   if err == io.EOF {
-   break
-   }
-   return n.fail(err)
-   }
+   a, err := n.newAccum(ctx, value.Elm)
+   if err != nil {
+   return n.fail(err)
+   }
+   first := true
 
-   a, err = n.addInput(ctx, a, value.Elm, v.Elm, 
value.Timestamp, first)
-   if err != nil {
-   return n.fail(err)
+   stream := values[0].Open()
+   for {
+   v, err := stream.Read()
+   if err != nil {
+   if err == io.EOF {
+   break
}
-   first = false
+   return n.fail(err)
}
-   stream.Close()
 
-   out, err := n.extract(ctx, a)
+   a, err = n.addInput(ctx, a, value.Elm, v.Elm, value.Timestamp, 
first)
if err != nil {
return n.fail(err)
}
-   return n.Out.ProcessElement(ctx, FullValue{Elm: value.Elm, 
Elm2: out, Timestamp: value.Timestamp})
+   first = false
}
+   stream.Close()
 
 Review comment:
   Done
   
   The defer cost is not a concern. I think the idea was to not hold the steam 
open while processing the continuation. We can tighten it up if it becomes an 
issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93518)
Time Spent: 2h 50m  (was: 2h 40m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93516=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93516
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183195020
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/combine.go
 ##
 @@ -93,51 +79,38 @@ func (n *Combine) ProcessElement(ctx context.Context, 
value FullValue, values ..
return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
}
 
-   if n.IsPerKey {
-   // For per-key combine, all processing can be done here. Note 
that
-   // we do not explicitly call merge, although it may be called 
implicitly
-   // when adding input.
+   // Note that we do not explicitly call merge, although it may
 
 Review comment:
   It's referring to the lack of combiner lifting, where we precombine and then 
merge the results. We just add each input into one accumulator, for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93516)
Time Spent: 2h 40m  (was: 2.5h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93522=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93522
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183195431
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/window.go
 ##
 @@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+   "context"
+   "fmt"
+   "time"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+)
+
+// WindowInto places each element in one or more windows.
+type WindowInto struct {
+   UID UnitID
+   Fn  *window.Fn
+   Out Node
+}
+
+func (w *WindowInto) ID() UnitID {
+   return w.UID
+}
+
+func (w *WindowInto) Up(ctx context.Context) error {
+   return nil
+}
+
+func (w *WindowInto) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+   return w.Out.StartBundle(ctx, id, data)
+}
+
+func (w *WindowInto) ProcessElement(ctx context.Context, elm FullValue, values 
...ReStream) error {
+   windowed := FullValue{
+   Windows:   assignWindows(w.Fn, elm.Timestamp),
+   Timestamp: elm.Timestamp,
+   Elm:   elm.Elm,
+   Elm2:  elm.Elm2,
+   }
+   return w.Out.ProcessElement(ctx, windowed, values...)
+}
+
+func assignWindows(wfn *window.Fn, ts typex.EventTime) []typex.Window {
+   switch wfn.Kind {
+   case window.GlobalWindows:
+   return window.SingleGlobalWindow
+
+   case window.FixedWindows:
+   start := ts - (ts.Add(wfn.Size) % mtime.FromDuration(wfn.Size))
+   end := mtime.Min(start.Add(wfn.Size), 
mtime.EndOfGlobalWindowTime.Add(time.Millisecond))
+   return []typex.Window{window.IntervalWindow{Start: start, End: 
end}}
+
+   case window.SlidingWindows:
+   var ret []typex.Window
+
+   period := mtime.FromDuration(wfn.Period)
+   lastStart := ts - (ts.Add(wfn.Size) % period)
+   for start := lastStart; start > ts.Subtract(wfn.Size); start -= 
period {
+   ret = append(ret, window.IntervalWindow{Start: start, 
End: start.Add(wfn.Size)})
+   }
+   return ret
+
+   default:
 
 Review comment:
   Opened BEAM-4152.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93522)
Time Spent: 3h 10m  (was: 3h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93514=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93514
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183148461
 
 

 ##
 File path: sdks/go/examples/windowed_wordcount/windowed_wordcount.go
 ##
 @@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93514)
Time Spent: 2.5h  (was: 2h 20m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93513=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93513
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183194942
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/window/windows.go
 ##
 @@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+import (
+   "fmt"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+)
+
+var (
+   // SingleGlobalWindow is a slice of a single global window. Convenience 
value.
+   SingleGlobalWindow = []typex.Window{GlobalWindow{}}
+)
+
+// GlobalWindow represents the singleton, global window.
+type GlobalWindow struct{}
+
+func (GlobalWindow) MaxTimestamp() typex.EventTime {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93513)
Time Spent: 2h 20m  (was: 2h 10m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93520=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93520
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183195295
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
 ##
 @@ -95,26 +96,56 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm 
FullValue, values ...ReS
}
 
ctx = metrics.SetPTransformID(ctx, n.PID)
+   fn := n.Fn.ProcessElementFn()
 
-   val, err := n.invokeDataFn(ctx, elm.Timestamp, n.Fn.ProcessElementFn(), 
{Key: elm, Values: values})
-   if err != nil {
-   return n.fail(err)
-   }
+   // If the function observes windows, we must pass invoke the it for 
each window. The expected fast path
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93520)
Time Spent: 3h  (was: 2h 50m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93511=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93511
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183194745
 
 

 ##
 File path: sdks/go/pkg/beam/core/funcx/fn_test.go
 ##
 @@ -106,6 +117,12 @@ func TestNew(t *testing.T) {
Fn:   func(context.Context, context.Context, int) {},
Err:  errContextParam,
},
+   {
+   Name: "errWindowParamPrecedence: after EventType",
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93511)
Time Spent: 2h 10m  (was: 2h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93515=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93515
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183195256
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
 ##
 @@ -46,17 +48,22 @@ func Invoke(ctx context.Context, fn *funcx.Fn, opt 
*MainInput, extra ...interfac
if index, ok := fn.Context(); ok {
args[index] = ctx
}
+   if index, ok := fn.Window(); ok {
+   if len(ws) != 1 {
+   return nil, fmt.Errorf("DoFns that observe windows must 
be invoked with single window: %v", opt.Key.Windows)
 
 Review comment:
   Each element may be in multiple windows, but are processing only once if the 
windows are not observed. Sliding windows is an example that produce multiple 
windows for an element.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93515)
Time Spent: 2h 40m  (was: 2.5h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93521=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93521
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183194856
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/mtime/time.go
 ##
 @@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package mtime contains a millisecond representation of time. The purpose
+// of this representation is alignment with the Beam specification, where we
+// need extreme values outside the range of time.Time for windowing.
+package mtime
+
+import (
+   "fmt"
+   "math"
+   "time"
+)
+
+const (
+   // MinTimestamp is the minimum value for any Beam timestamp. Often 
referred to
+   // as "-infinity". This value and MaxTimestamp are chosen so that their
+   // microseconds-since-epoch can be safely represented with an int64 and 
boundary
+   // values can be represented correctly with milli-second precision.
+   MinTimestamp Time = math.MinInt64 / 1000
+
+   // MaxTimestamp is the maximum value for any Beam timestamp. Often 
referred to
+   // as "-infinity".
+   MaxTimestamp Time = math.MaxInt64 / 1000
+
+   // EndOfGlobalWindowTime is the timestamp at the end of the global 
window. It
+   // is a day before the max timestamp.
+   EndOfGlobalWindowTime = MaxTimestamp - 24*60*60*1000
+
+   // ZeroTimestamp is the default zero value time. It corresponds to the 
unix epoch.
+   ZeroTimestamp Time = 0
+)
+
+// Time is the number of milli-seconds since the Unix epoch. The valid range of
+// times is bounded by what can be represented a _micro_-seconds-since-epoch.
+type Time int64
+
+// Now returns the current time.
+func Now() Time {
+   return FromTime(time.Now())
+}
+
+// FromMilliseconds returns a timestamp from a raw milliseconds-since-epoch 
value.
+func FromMilliseconds(unixMilliseconds int64) Time {
+   return Normalize(Time(unixMilliseconds))
+}
+
+// FromDuration returns a timestamp from a time.Duration-since-epoch value.
+func FromDuration(d time.Duration) Time {
+   return ZeroTimestamp.Add(d)
+}
+
+// FromTime returns a milli-second precision timestamp from a time.Time.
+func FromTime(t time.Time) Time {
+   return Normalize(Time(n2m(t.UnixNano(
+}
+
+// Milliseconds returns the number of milli-seconds since the Unix epoch.
+func (t Time) Milliseconds() int64 {
+   return int64(t)
+}
+
+// Add returns the time plus the duration.
+func (t Time) Add(d time.Duration) Time {
+   return Normalize(Time(int64(t) + n2m(d.Nanoseconds(
+}
+
+// Subtract returns the time minus the duration.
+func (t Time) Subtract(d time.Duration) Time {
+   return Normalize(Time(int64(t) - n2m(d.Nanoseconds(
+}
+
+func (t Time) String() string {
+   switch t {
+   case MinTimestamp:
+   return "-inf"
+   case MaxTimestamp:
+   return "+inf"
+   case EndOfGlobalWindowTime:
+   return "glo"
+   default:
+   return fmt.Sprintf("%v", t.Milliseconds())
+   }
+}
+
+// Min returns the smallest (earliest) time.
+func Min(a, b Time) Time {
+   if int64(a) < int64(b) {
+   return a
+   } else {
+   return b
+   }
+}
+
+// Max returns the largest (latest) time.
+func Max(a, b Time) Time {
+   if int64(a) < int64(b) {
+   return b
+   } else {
+   return a
+   }
+}
+
+// Normalize ensures a Time is within [MinTimestamp,MaxTimestamp].
+func Normalize(t Time) Time {
+   return Min(Max(t, MinTimestamp), MaxTimestamp)
+}
+
+func n2m(v int64) int64 {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please 

[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93512=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93512
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183194837
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/mtime/time.go
 ##
 @@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package mtime contains a millisecond representation of time. The purpose
+// of this representation is alignment with the Beam specification, where we
+// need extreme values outside the range of time.Time for windowing.
+package mtime
+
+import (
+   "fmt"
+   "math"
+   "time"
+)
+
+const (
+   // MinTimestamp is the minimum value for any Beam timestamp. Often 
referred to
+   // as "-infinity". This value and MaxTimestamp are chosen so that their
+   // microseconds-since-epoch can be safely represented with an int64 and 
boundary
+   // values can be represented correctly with milli-second precision.
+   MinTimestamp Time = math.MinInt64 / 1000
+
+   // MaxTimestamp is the maximum value for any Beam timestamp. Often 
referred to
+   // as "-infinity".
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93512)
Time Spent: 2h 10m  (was: 2h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-20 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=93517=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93517
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 21/Apr/18 01:15
Start Date: 21/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r183195326
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/translate.go
 ##
 @@ -68,15 +69,19 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) 
(*Plan, error) {
}
 
if cid == "" {
-   u.Coder, err = b.makeCoderForPCollection(pid)
+   c, wc, err := b.makeCoderForPCollection(pid)
if err != nil {
return nil, err
}
+   u.Coder = coder.NewW(c, wc)
} else {
u.Coder, err = b.coders.Coder(cid) // Expected 
to be windowed coder
if err != nil {
return nil, err
}
+   if !coder.IsW(u.Coder) {
+   return nil, fmt.Errorf("unwindowed 
coder %v on DataSource %v: %v", cid, id, u.Coder)
 
 Review comment:
   Runner bug.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 93517)
Time Spent: 2h 40m  (was: 2.5h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92909=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92909
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910787
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
 ##
 @@ -95,26 +96,56 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm 
FullValue, values ...ReS
}
 
ctx = metrics.SetPTransformID(ctx, n.PID)
+   fn := n.Fn.ProcessElementFn()
 
-   val, err := n.invokeDataFn(ctx, elm.Timestamp, n.Fn.ProcessElementFn(), 
{Key: elm, Values: values})
-   if err != nil {
-   return n.fail(err)
-   }
+   // If the function observes windows, we must pass invoke the it for 
each window. The expected fast path
 
 Review comment:
   
   s/the it/it/


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92909)
Time Spent: 1h 10m  (was: 1h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92914=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92914
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910798
 
 

 ##
 File path: sdks/go/pkg/beam/core/funcx/fn_test.go
 ##
 @@ -106,6 +117,12 @@ func TestNew(t *testing.T) {
Fn:   func(context.Context, context.Context, int) {},
Err:  errContextParam,
},
+   {
+   Name: "errWindowParamPrecedence: after EventType",
 
 Review comment:
   
   s/EventType/EventTime


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92914)
Time Spent: 2h  (was: 1h 50m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92903=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92903
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910777
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/translate.go
 ##
 @@ -68,15 +69,19 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) 
(*Plan, error) {
}
 
if cid == "" {
-   u.Coder, err = b.makeCoderForPCollection(pid)
+   c, wc, err := b.makeCoderForPCollection(pid)
if err != nil {
return nil, err
}
+   u.Coder = coder.NewW(c, wc)
} else {
u.Coder, err = b.coders.Coder(cid) // Expected 
to be windowed coder
if err != nil {
return nil, err
}
+   if !coder.IsW(u.Coder) {
+   return nil, fmt.Errorf("unwindowed 
coder %v on DataSource %v: %v", cid, id, u.Coder)
 
 Review comment:
   
   When this error occurs, what's the fix? Is it indicative of an SDK bug or a 
pipeline author bug or an io author bug?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92903)
Time Spent: 0.5h  (was: 20m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92907=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92907
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910780
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/window/windows.go
 ##
 @@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+import (
+   "fmt"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+)
+
+var (
+   // SingleGlobalWindow is a slice of a single global window. Convenience 
value.
+   SingleGlobalWindow = []typex.Window{GlobalWindow{}}
+)
+
+// GlobalWindow represents the singleton, global window.
+type GlobalWindow struct{}
+
+func (GlobalWindow) MaxTimestamp() typex.EventTime {
 
 Review comment:
   
   Top level Exported declarations (even methods to satisfy an interface) 
should have a comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92907)
Time Spent: 1h  (was: 50m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92911=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92911
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910790
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/combine.go
 ##
 @@ -93,51 +79,38 @@ func (n *Combine) ProcessElement(ctx context.Context, 
value FullValue, values ..
return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
}
 
-   if n.IsPerKey {
-   // For per-key combine, all processing can be done here. Note 
that
-   // we do not explicitly call merge, although it may be called 
implicitly
-   // when adding input.
+   // Note that we do not explicitly call merge, although it may
 
 Review comment:
   
   Is this referring to the  non-initial entries in values? We don't appear to 
do anything with them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92911)
Time Spent: 1.5h  (was: 1h 20m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92904=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92904
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910779
 
 

 ##
 File path: sdks/go/examples/windowed_wordcount/windowed_wordcount.go
 ##
 @@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 
 Review comment:
   
   Consider adding a package doc explaining the example.
   
   // Windowed_wordcount demonstrates using Windowing with the Go SDK. 
   // Windowing is a concept in the Beam model that subdivides a PCollection 
based on the
   // EventTime of an element.
   // See the Beam Programming guide at 
https://beam.apache.org/documentation/programming-guide/#windowing for more 
information.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92904)
Time Spent: 40m  (was: 0.5h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92908=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92908
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910775
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/mtime/time.go
 ##
 @@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package mtime contains a millisecond representation of time. The purpose
+// of this representation is alignment with the Beam specification, where we
+// need extreme values outside the range of time.Time for windowing.
+package mtime
+
+import (
+   "fmt"
+   "math"
+   "time"
+)
+
+const (
+   // MinTimestamp is the minimum value for any Beam timestamp. Often 
referred to
+   // as "-infinity". This value and MaxTimestamp are chosen so that their
+   // microseconds-since-epoch can be safely represented with an int64 and 
boundary
+   // values can be represented correctly with milli-second precision.
+   MinTimestamp Time = math.MinInt64 / 1000
+
+   // MaxTimestamp is the maximum value for any Beam timestamp. Often 
referred to
+   // as "-infinity".
 
 Review comment:
   
   "infinity"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92908)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92913=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92913
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910794
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/fn.go
 ##
 @@ -46,17 +48,22 @@ func Invoke(ctx context.Context, fn *funcx.Fn, opt 
*MainInput, extra ...interfac
if index, ok := fn.Context(); ok {
args[index] = ctx
}
+   if index, ok := fn.Window(); ok {
+   if len(ws) != 1 {
+   return nil, fmt.Errorf("DoFns that observe windows must 
be invoked with single window: %v", opt.Key.Windows)
 
 Review comment:
   
   When would there be two windows on a PCollection? Simply by applying 
WindowInto twice, the second being a finer grain than the first?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92913)
Time Spent: 1h 50m  (was: 1h 40m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92910=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92910
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910789
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/combine.go
 ##
 @@ -93,51 +79,38 @@ func (n *Combine) ProcessElement(ctx context.Context, 
value FullValue, values ..
return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
}
 
-   if n.IsPerKey {
-   // For per-key combine, all processing can be done here. Note 
that
-   // we do not explicitly call merge, although it may be called 
implicitly
-   // when adding input.
+   // Note that we do not explicitly call merge, although it may
+   // be called implicitly when adding input.
 
-   a, err := n.newAccum(ctx, value.Elm)
-   if err != nil {
-   return n.fail(err)
-   }
-   first := true
-
-   stream := values[0].Open()
-   for {
-   v, err := stream.Read()
-   if err != nil {
-   if err == io.EOF {
-   break
-   }
-   return n.fail(err)
-   }
+   a, err := n.newAccum(ctx, value.Elm)
+   if err != nil {
+   return n.fail(err)
+   }
+   first := true
 
-   a, err = n.addInput(ctx, a, value.Elm, v.Elm, 
value.Timestamp, first)
-   if err != nil {
-   return n.fail(err)
+   stream := values[0].Open()
+   for {
+   v, err := stream.Read()
+   if err != nil {
+   if err == io.EOF {
+   break
}
-   first = false
+   return n.fail(err)
}
-   stream.Close()
 
-   out, err := n.extract(ctx, a)
+   a, err = n.addInput(ctx, a, value.Elm, v.Elm, value.Timestamp, 
first)
if err != nil {
return n.fail(err)
}
-   return n.Out.ProcessElement(ctx, FullValue{Elm: value.Elm, 
Elm2: out, Timestamp: value.Timestamp})
+   first = false
}
+   stream.Close()
 
 Review comment:
   
   This behaviour was present previously, but consider defering the close after 
the Open call, since there are early returns, the stream may be left open when 
there's an a panic.
   
   That said, if none of the other calls can panic, consider explicitly closing 
before each early return, if the defer cost is a concern.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92910)
Time Spent: 1h 20m  (was: 1h 10m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92912=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92912
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910796
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/coder/time_test.go
 ##
 @@ -18,32 +18,25 @@ package coder
 import (
"bytes"
"testing"
-   "time"
 
-   "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
 )
 
 func TestEncodeDecodeEventTime(t *testing.T) {
tests := []struct {
-   timetime.Time
-   errExpected bool
+   time mtime.Time
}{
-   {time: time.Unix(0, 0)},
-   {time: time.Unix(10, 0)},
-   {time: time.Unix(1257894000, 0)},
-   {time: time.Unix(0, 1257894)},
-   {time: time.Time{}, errExpected: true},
+   {time: mtime.ZeroTimestamp},
+   {time: mtime.MinTimestamp},
+   {time: mtime.MaxTimestamp},
+   {time: mtime.Now()},
 
 Review comment:
   
   While *Now* is admirable to add as a test, it tells you too late that 
there's a problem. Consider Now() +  a month instead, which at least gives 
advanced notice.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92912)
Time Spent: 1h 40m  (was: 1.5h)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92905=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92905
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910781
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/mtime/time.go
 ##
 @@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package mtime contains a millisecond representation of time. The purpose
+// of this representation is alignment with the Beam specification, where we
+// need extreme values outside the range of time.Time for windowing.
+package mtime
+
+import (
+   "fmt"
+   "math"
+   "time"
+)
+
+const (
+   // MinTimestamp is the minimum value for any Beam timestamp. Often 
referred to
+   // as "-infinity". This value and MaxTimestamp are chosen so that their
+   // microseconds-since-epoch can be safely represented with an int64 and 
boundary
+   // values can be represented correctly with milli-second precision.
+   MinTimestamp Time = math.MinInt64 / 1000
+
+   // MaxTimestamp is the maximum value for any Beam timestamp. Often 
referred to
+   // as "-infinity".
+   MaxTimestamp Time = math.MaxInt64 / 1000
+
+   // EndOfGlobalWindowTime is the timestamp at the end of the global 
window. It
+   // is a day before the max timestamp.
+   EndOfGlobalWindowTime = MaxTimestamp - 24*60*60*1000
+
+   // ZeroTimestamp is the default zero value time. It corresponds to the 
unix epoch.
+   ZeroTimestamp Time = 0
+)
+
+// Time is the number of milli-seconds since the Unix epoch. The valid range of
+// times is bounded by what can be represented a _micro_-seconds-since-epoch.
+type Time int64
+
+// Now returns the current time.
+func Now() Time {
+   return FromTime(time.Now())
+}
+
+// FromMilliseconds returns a timestamp from a raw milliseconds-since-epoch 
value.
+func FromMilliseconds(unixMilliseconds int64) Time {
+   return Normalize(Time(unixMilliseconds))
+}
+
+// FromDuration returns a timestamp from a time.Duration-since-epoch value.
+func FromDuration(d time.Duration) Time {
+   return ZeroTimestamp.Add(d)
+}
+
+// FromTime returns a milli-second precision timestamp from a time.Time.
+func FromTime(t time.Time) Time {
+   return Normalize(Time(n2m(t.UnixNano(
+}
+
+// Milliseconds returns the number of milli-seconds since the Unix epoch.
+func (t Time) Milliseconds() int64 {
+   return int64(t)
+}
+
+// Add returns the time plus the duration.
+func (t Time) Add(d time.Duration) Time {
+   return Normalize(Time(int64(t) + n2m(d.Nanoseconds(
+}
+
+// Subtract returns the time minus the duration.
+func (t Time) Subtract(d time.Duration) Time {
+   return Normalize(Time(int64(t) - n2m(d.Nanoseconds(
+}
+
+func (t Time) String() string {
+   switch t {
+   case MinTimestamp:
+   return "-inf"
+   case MaxTimestamp:
+   return "+inf"
+   case EndOfGlobalWindowTime:
+   return "glo"
+   default:
+   return fmt.Sprintf("%v", t.Milliseconds())
+   }
+}
+
+// Min returns the smallest (earliest) time.
+func Min(a, b Time) Time {
+   if int64(a) < int64(b) {
+   return a
+   } else {
+   return b
+   }
+}
+
+// Max returns the largest (latest) time.
+func Max(a, b Time) Time {
+   if int64(a) < int64(b) {
+   return b
+   } else {
+   return a
+   }
+}
+
+// Normalize ensures a Time is within [MinTimestamp,MaxTimestamp].
+func Normalize(t Time) Time {
+   return Min(Max(t, MinTimestamp), MaxTimestamp)
+}
+
+func n2m(v int64) int64 {
 
 Review comment:
   
   Consider:
   // n2m converts nanoseconds to milliseconds.


This is an automated message from 

[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92906=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92906
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 23:18
Start Date: 19/Apr/18 23:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #5179: 
[BEAM-3303] Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179#discussion_r182910776
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/window.go
 ##
 @@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+   "context"
+   "fmt"
+   "time"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+)
+
+// WindowInto places each element in one or more windows.
+type WindowInto struct {
+   UID UnitID
+   Fn  *window.Fn
+   Out Node
+}
+
+func (w *WindowInto) ID() UnitID {
+   return w.UID
+}
+
+func (w *WindowInto) Up(ctx context.Context) error {
+   return nil
+}
+
+func (w *WindowInto) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+   return w.Out.StartBundle(ctx, id, data)
+}
+
+func (w *WindowInto) ProcessElement(ctx context.Context, elm FullValue, values 
...ReStream) error {
+   windowed := FullValue{
+   Windows:   assignWindows(w.Fn, elm.Timestamp),
+   Timestamp: elm.Timestamp,
+   Elm:   elm.Elm,
+   Elm2:  elm.Elm2,
+   }
+   return w.Out.ProcessElement(ctx, windowed, values...)
+}
+
+func assignWindows(wfn *window.Fn, ts typex.EventTime) []typex.Window {
+   switch wfn.Kind {
+   case window.GlobalWindows:
+   return window.SingleGlobalWindow
+
+   case window.FixedWindows:
+   start := ts - (ts.Add(wfn.Size) % mtime.FromDuration(wfn.Size))
+   end := mtime.Min(start.Add(wfn.Size), 
mtime.EndOfGlobalWindowTime.Add(time.Millisecond))
+   return []typex.Window{window.IntervalWindow{Start: start, End: 
end}}
+
+   case window.SlidingWindows:
+   var ret []typex.Window
+
+   period := mtime.FromDuration(wfn.Period)
+   lastStart := ts - (ts.Add(wfn.Size) % period)
+   for start := lastStart; start > ts.Subtract(wfn.Size); start -= 
period {
+   ret = append(ret, window.IntervalWindow{Start: start, 
End: start.Add(wfn.Size)})
+   }
+   return ret
+
+   default:
 
 Review comment:
   
   Is there a TODO /JIRA for the Session window support?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92906)
Time Spent: 1h  (was: 50m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92437=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92437
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 07:28
Start Date: 19/Apr/18 07:28
Worklog Time Spent: 10m 
  Work Description: herohde commented on issue #5179: [BEAM-3303] Add Go 
Windowing support
URL: https://github.com/apache/beam/pull/5179#issuecomment-382638125
 
 
   R: @wcn3 @lostluck 
   
   Sorry for the massive PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92437)
Time Spent: 20m  (was: 10m)

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3303) Go windowing support

2018-04-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3303?focusedWorklogId=92436=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92436
 ]

ASF GitHub Bot logged work on BEAM-3303:


Author: ASF GitHub Bot
Created on: 19/Apr/18 07:25
Start Date: 19/Apr/18 07:25
Worklog Time Spent: 10m 
  Work Description: herohde opened a new pull request #5179: [BEAM-3303] 
Add Go Windowing support
URL: https://github.com/apache/beam/pull/5179
 
 
* Added WindowInto and supporting types and handling. Made EventTime align 
with Beam
* Added windowed_wordcount. Runs on direct + dataflow. Flink doesn't seem 
to support the urn yet.
* Fixed Dataflow encoding issue and made it use model pipeline references 
for ParDo [BEAM-3448].


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92436)
Time Spent: 10m
Remaining Estimate: 0h

> Go windowing support
> 
>
> Key: BEAM-3303
> URL: https://issues.apache.org/jira/browse/BEAM-3303
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add support for Window.into and windowing strategies on Node. Implement the 
> various windowing strategies Beam has: GlobalWindow, FixedWindows, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)