[GitHub] [beam] pabloem commented on issue #11202: asdletmedah

2020-03-23 Thread GitBox
pabloem commented on issue #11202: asdletmedah
URL: https://github.com/apache/beam/pull/11202#issuecomment-603028147
 
 
   Run Portable_Python PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable 
Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#discussion_r396853348
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/reshuffle.go
 ##
 @@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+   "bytes"
+   "context"
+   "fmt"
+   "io"
+   "math/rand"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+   "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+// ReshuffleInput is a Node.
+type ReshuffleInput struct {
+   UID   UnitID
+   SID   StreamID
+   Coder *coder.Coder // Coder for the input PCollection.
+   Seed  int64
+   Out   Node
+
+   r*rand.Rand
+   enc  ElementEncoder
+   wEnc WindowEncoder
+   bbytes.Buffer
+   // ret is a cached allocations for passing to the next Unit. Units 
never modify the passed in FullValue.
+   ret FullValue
+}
+
+// ID returns the unit debug id.
+func (n *ReshuffleInput) ID() UnitID {
+   return n.UID
+}
+
+// Up initializes the value and window encoders, and the random source.
+func (n *ReshuffleInput) Up(ctx context.Context) error {
+   n.enc = MakeElementEncoder(coder.SkipW(n.Coder))
+   n.wEnc = MakeWindowEncoder(n.Coder.Window)
+   n.r = rand.New(rand.NewSource(n.Seed))
+   return nil
+}
+
+// StartBundle is a no-op.
+func (n *ReshuffleInput) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+   return MultiStartBundle(ctx, id, data, n.Out)
+}
+
+func (n *ReshuffleInput) ProcessElement(ctx context.Context, value *FullValue, 
values ...ReStream) error {
+   n.b.Reset()
+   if err := EncodeWindowedValueHeader(n.wEnc, value.Windows, 
value.Timestamp, ); err != nil {
+   return err
+   }
+   if err := n.enc.Encode(value, ); err != nil {
+   return errors.WithContextf(err, "encoding element %v with coder 
%v", value, n.Coder)
+   }
+   n.ret = FullValue{Elm: n.r.Int(), Elm2: n.b.Bytes(), Timestamp: 
value.Timestamp}
+   if err := n.Out.ProcessElement(ctx, ); err != nil {
+   return err
+   }
+   return nil
 
 Review comment:
   ```suggestion
return n.Out.ProcessElement(ctx, )
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable 
Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#discussion_r396887810
 
 

 ##
 File path: sdks/go/pkg/beam/gbk.go
 ##
 @@ -95,3 +95,52 @@ func TryCoGroupByKey(s Scope, cols ...PCollection) 
(PCollection, error) {
ret.SetCoder(NewCoder(ret.Type()))
return ret, nil
 }
+
+// Reshuffle copies a PCollection of the same kind and using the same element
+// coder, and maintains the same windowing information. Importantly, it allows
+// the result PCollection to be processed with a different sharding, in a
+// different stage than the input PCollection.
+//
+// For example, if a computation needs a lot of parallelism but
+// produces only a small amount of output data, then the computation
+// producing the data can run with as much parallelism as needed,
+// while the output file is written with a smaller amount of
+// parallelism, using the following pattern:
+//
+//   pc := bigHairyComputationNeedingParallelism(scope) // PCollection
+//   resharded := beam.Reshard(scope, pc)// PCollection
 
 Review comment:
   Here and elsewhere in this comment, Reshuffle is referred to as "Reshard". I 
think it's fine to refer to it as a reshard informally, since that's what it 
functionally is, but the places where it's used as a proper noun should be 
switched to Reshuffle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable 
Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#discussion_r396823199
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
 ##
 @@ -259,59 +298,82 @@ type customDecoder struct {
dec Decoder
 }
 
-func (c *customDecoder) Decode(r io.Reader) (*FullValue, error) {
+func (c *customDecoder) DecodeTo(r io.Reader, fv *FullValue) error {
// (1) Read length-prefixed encoded data
 
size, err := coder.DecodeVarInt(r)
if err != nil {
-   return nil, err
+   return err
}
data, err := ioutilx.ReadN(r, (int)(size))
if err != nil {
-   return nil, err
+   return err
}
 
// (2) Call decode
 
val, err := c.dec.Decode(c.t, data)
if err != nil {
+   return err
+   }
+   *fv = FullValue{Elm: val}
+   return err
 
 Review comment:
   I know this is just preserving the existing behavior, but it seems weird to 
`return err` here instead of `return nil`, even if it is guaranteed to be `nil` 
at this point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable 
Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#discussion_r396886566
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
 ##
 @@ -473,6 +479,164 @@ func (m *marshaller) addNode(n *graph.Node) string {
return m.makeNode(id, m.coders.Add(n.Coder), n)
 }
 
+// expandReshuffle translates resharding to a composite reshuffle
+// transform.
+//
+// With proper runner support, the SDK doesn't need to do anything.
+// However, we still need to provide a backup plan in terms of other
+// PTransforms in the event the runner doesn't have a native implementation.
+//
+// In particular, the "backup plan" needs to:
+//
+//  * Encode the windowed element, preserving timestamps.
+//  * Add random keys to the encoded windowed element []bytes
+//  * GroupByKey (in the global window).
+//  * Explode the resulting elements list.
+//  * Decode the windowed element []bytes.
+//
+// While a simple reshard can be written in user terms, (timestamps and windows
+// are accessible to user functions) there are some framework internal
+// optimizations that can be done if the framework is aware of the reshard, 
though
+// ideally this is handled on the runner side.
+//
+// User code is able to write reshards, but it's easier to access
+// the window coders framework side, which is critical for the reshard
+// to function with unbounded inputs.
+func (m *marshaller) expandReshuffle(edge NamedEdge) string {
+   id := edgeID(edge.Edge)
+   var kvCoderID, gbkCoderID string
+   {
+   kv := makeUnionCoder()
+   kvCoderID = m.coders.Add(kv)
+   gbkCoderID = m.coders.Add(coder.NewCoGBK(kv.Components))
+   }
+
+   var subtransforms []string
+
+   in := edge.Edge.Input[0]
+
+   origInput := m.addNode(in.From)
+   // We need to preserve the old windowing/triggering here
+   // for re-instatement after the GBK.
+   preservedWSId := m.pcollections[origInput].GetWindowingStrategyId()
+
+   // Get the windowing strategy from before:
+   postReify := fmt.Sprintf("%v_%v_reifyts", nodeID(in.From), id)
+   m.makeNode(postReify, kvCoderID, in.From)
+
+   // We need to replace postReify's windowing strategy with one 
appropriate
+   // for reshuffles.
+   {
+   wfn := window.NewGlobalWindows()
+   m.pcollections[postReify].WindowingStrategyId =
+   m.internWindowingStrategy({
+   // Not segregated by time...
+   WindowFn: makeWindowFn(wfn),
+   // ...output after every element is received...
+   Trigger: {
+   // Should this be an Always trigger 
instead?
+   Trigger: _ElementCount_{
+   ElementCount: 
_ElementCount{
+   ElementCount: 1,
+   },
+   },
+   },
+   // ...and after outputing, discard the output 
elements...
+   AccumulationMode: 
pb.AccumulationMode_DISCARDING,
+   // ...and since every pane should have 1 
element,
+   // try to preserve the timestamp.
+   OutputTime: pb.OutputTime_EARLIEST_IN_PANE,
+   // Defaults copied from 
marshalWindowingStrategy.
+   // TODO(BEAM-3304): migrate to user side 
operations once trigger support is in.
+   EnvironmentId:   m.addDefaultEnv(),
+   MergeStatus: pb.MergeStatus_NON_MERGING,
+   WindowCoderId:   
m.coders.AddWindowCoder(makeWindowCoder(wfn)),
+   ClosingBehavior: 
pb.ClosingBehavior_EMIT_IF_NONEMPTY,
+   AllowedLateness: 0,
+   OnTimeBehavior:  pb.OnTimeBehavior_FIRE_ALWAYS,
+   })
+   }
+
+   // Inputs (i)
+
+   inputID := fmt.Sprintf("%v_reifyts", id)
+   payload := {
+   DoFn: {
+   Urn: URNReshuffleInput,
+   Payload: 
[]byte(protox.MustEncodeBase64({
+   Urn: URNReshuffleInput,
+   })),
+   },
+   }
+   input := {
+   UniqueName: inputID,
+   Spec: {
+   Urn: URNParDo,
+   Payload: protox.MustEncode(payload),
+   },
+   Inputs:map[string]string{"i0": nodeID(in.From)},
+   Outputs:   

[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable 
Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#discussion_r396887966
 
 

 ##
 File path: sdks/go/pkg/beam/gbk.go
 ##
 @@ -95,3 +95,52 @@ func TryCoGroupByKey(s Scope, cols ...PCollection) 
(PCollection, error) {
ret.SetCoder(NewCoder(ret.Type()))
return ret, nil
 }
+
+// Reshuffle copies a PCollection of the same kind and using the same element
+// coder, and maintains the same windowing information. Importantly, it allows
+// the result PCollection to be processed with a different sharding, in a
+// different stage than the input PCollection.
+//
+// For example, if a computation needs a lot of parallelism but
+// produces only a small amount of output data, then the computation
+// producing the data can run with as much parallelism as needed,
+// while the output file is written with a smaller amount of
+// parallelism, using the following pattern:
+//
+//   pc := bigHairyComputationNeedingParallelism(scope) // PCollection
+//   resharded := beam.Reshard(scope, pc)// PCollection
+//
+// Another use case is when one has a non-deterministic DoFn followed by one
+// that performs externally-visible side effects. Inserting a Reshard
+// between these DoFns ensures that retries of the second DoFn will always be
+// the same, which is necessary to make side effects idempotent.
+//
+// A Reshuffle will force a break in the optimized pipeline. Consequently,
+// this operation should be used sparingly, only after determining that the
+// pipeline without reshard is broken in some way and performing an extra
+// operation is worth the cost.
+func Reshuffle(s Scope, col PCollection) PCollection {
+   return Must(TryReshuffle(s, col))
+}
+
+// TryReshuffle inserts a Reshard into the pipeline, and returns an error if
 
 Review comment:
   Same as previous comment, using Reshard instead of Reshuffle. The error 
message a few lines below also does that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable 
Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#discussion_r396855047
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/reshuffle.go
 ##
 @@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+   "bytes"
+   "context"
+   "fmt"
+   "io"
+   "math/rand"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+   "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+// ReshuffleInput is a Node.
+type ReshuffleInput struct {
+   UID   UnitID
+   SID   StreamID
+   Coder *coder.Coder // Coder for the input PCollection.
+   Seed  int64
+   Out   Node
+
+   r*rand.Rand
+   enc  ElementEncoder
+   wEnc WindowEncoder
+   bbytes.Buffer
+   // ret is a cached allocations for passing to the next Unit. Units 
never modify the passed in FullValue.
+   ret FullValue
+}
+
+// ID returns the unit debug id.
+func (n *ReshuffleInput) ID() UnitID {
+   return n.UID
+}
+
+// Up initializes the value and window encoders, and the random source.
+func (n *ReshuffleInput) Up(ctx context.Context) error {
+   n.enc = MakeElementEncoder(coder.SkipW(n.Coder))
+   n.wEnc = MakeWindowEncoder(n.Coder.Window)
+   n.r = rand.New(rand.NewSource(n.Seed))
+   return nil
+}
+
+// StartBundle is a no-op.
+func (n *ReshuffleInput) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+   return MultiStartBundle(ctx, id, data, n.Out)
+}
+
+func (n *ReshuffleInput) ProcessElement(ctx context.Context, value *FullValue, 
values ...ReStream) error {
+   n.b.Reset()
+   if err := EncodeWindowedValueHeader(n.wEnc, value.Windows, 
value.Timestamp, ); err != nil {
+   return err
+   }
+   if err := n.enc.Encode(value, ); err != nil {
+   return errors.WithContextf(err, "encoding element %v with coder 
%v", value, n.Coder)
+   }
+   n.ret = FullValue{Elm: n.r.Int(), Elm2: n.b.Bytes(), Timestamp: 
value.Timestamp}
+   if err := n.Out.ProcessElement(ctx, ); err != nil {
+   return err
+   }
+   return nil
+}
+
+// FinishBundle propagates finish bundle, and clears cached state.
+func (n *ReshuffleInput) FinishBundle(ctx context.Context) error {
+   n.b = bytes.Buffer{}
+   n.ret = FullValue{}
+   return MultiFinishBundle(ctx, n.Out)
+}
+
+// Down is a no-op.
+func (n *ReshuffleInput) Down(ctx context.Context) error {
+   return nil
+}
+
+func (n *ReshuffleInput) String() string {
+   return fmt.Sprintf("ReshuffleInput[%v] Coder:%v", n.SID, n.Coder)
+}
+
+// ReshuffleOutput is a Node.
+type ReshuffleOutput struct {
+   UID   UnitID
+   SID   StreamID
+   Coder *coder.Coder // Coder for the receiving PCollection.
+   Out   Node
+
+   bbytes.Buffer
+   dec  ElementDecoder
+   wDec WindowDecoder
+   ret  FullValue
+}
+
+// ID returns the unit debug id.
+func (n *ReshuffleOutput) ID() UnitID {
+   return n.UID
+}
+
+// Up initializes the value and window encoders, and the random source.
+func (n *ReshuffleOutput) Up(ctx context.Context) error {
+   n.dec = MakeElementDecoder(coder.SkipW(n.Coder))
+   n.wDec = MakeWindowDecoder(n.Coder.Window)
+   return nil
+}
+
+// StartBundle is a no-op.
+func (n *ReshuffleOutput) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+   return MultiStartBundle(ctx, id, data, n.Out)
+}
+
+func (n *ReshuffleOutput) ProcessElement(ctx context.Context, value 
*FullValue, values ...ReStream) error {
+   // Marshal the pieces into a temporary buffer since they must be 
transmitted on FnAPI as a single
+   // unit.
+   vs, err := values[0].Open()
+   if err != nil {
+   return errors.WithContextf(err, "decoding values for %v with 
coder %v", value, n.Coder)
+   }
+   defer vs.Close()
+   for {
+   v, err := vs.Read()
+   if err != nil {
+   if err == io.EOF {
+   return nil
+   }
+  

[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable 
Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#discussion_r396856077
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/exec/reshuffle.go
 ##
 @@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+   "bytes"
+   "context"
+   "fmt"
+   "io"
+   "math/rand"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+   "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+// ReshuffleInput is a Node.
+type ReshuffleInput struct {
+   UID   UnitID
+   SID   StreamID
+   Coder *coder.Coder // Coder for the input PCollection.
+   Seed  int64
+   Out   Node
+
+   r*rand.Rand
+   enc  ElementEncoder
+   wEnc WindowEncoder
+   bbytes.Buffer
+   // ret is a cached allocations for passing to the next Unit. Units 
never modify the passed in FullValue.
+   ret FullValue
+}
+
+// ID returns the unit debug id.
+func (n *ReshuffleInput) ID() UnitID {
+   return n.UID
+}
+
+// Up initializes the value and window encoders, and the random source.
+func (n *ReshuffleInput) Up(ctx context.Context) error {
+   n.enc = MakeElementEncoder(coder.SkipW(n.Coder))
+   n.wEnc = MakeWindowEncoder(n.Coder.Window)
+   n.r = rand.New(rand.NewSource(n.Seed))
+   return nil
+}
+
+// StartBundle is a no-op.
+func (n *ReshuffleInput) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+   return MultiStartBundle(ctx, id, data, n.Out)
+}
+
+func (n *ReshuffleInput) ProcessElement(ctx context.Context, value *FullValue, 
values ...ReStream) error {
+   n.b.Reset()
+   if err := EncodeWindowedValueHeader(n.wEnc, value.Windows, 
value.Timestamp, ); err != nil {
+   return err
+   }
+   if err := n.enc.Encode(value, ); err != nil {
+   return errors.WithContextf(err, "encoding element %v with coder 
%v", value, n.Coder)
+   }
+   n.ret = FullValue{Elm: n.r.Int(), Elm2: n.b.Bytes(), Timestamp: 
value.Timestamp}
+   if err := n.Out.ProcessElement(ctx, ); err != nil {
+   return err
+   }
+   return nil
+}
+
+// FinishBundle propagates finish bundle, and clears cached state.
+func (n *ReshuffleInput) FinishBundle(ctx context.Context) error {
+   n.b = bytes.Buffer{}
+   n.ret = FullValue{}
+   return MultiFinishBundle(ctx, n.Out)
+}
+
+// Down is a no-op.
+func (n *ReshuffleInput) Down(ctx context.Context) error {
+   return nil
+}
+
+func (n *ReshuffleInput) String() string {
+   return fmt.Sprintf("ReshuffleInput[%v] Coder:%v", n.SID, n.Coder)
+}
+
+// ReshuffleOutput is a Node.
+type ReshuffleOutput struct {
+   UID   UnitID
+   SID   StreamID
+   Coder *coder.Coder // Coder for the receiving PCollection.
+   Out   Node
+
+   bbytes.Buffer
+   dec  ElementDecoder
+   wDec WindowDecoder
+   ret  FullValue
+}
+
+// ID returns the unit debug id.
+func (n *ReshuffleOutput) ID() UnitID {
+   return n.UID
+}
+
+// Up initializes the value and window encoders, and the random source.
+func (n *ReshuffleOutput) Up(ctx context.Context) error {
+   n.dec = MakeElementDecoder(coder.SkipW(n.Coder))
+   n.wDec = MakeWindowDecoder(n.Coder.Window)
+   return nil
+}
+
+// StartBundle is a no-op.
+func (n *ReshuffleOutput) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+   return MultiStartBundle(ctx, id, data, n.Out)
+}
+
+func (n *ReshuffleOutput) ProcessElement(ctx context.Context, value 
*FullValue, values ...ReStream) error {
+   // Marshal the pieces into a temporary buffer since they must be 
transmitted on FnAPI as a single
+   // unit.
+   vs, err := values[0].Open()
 
 Review comment:
   It's strange to me that `values[]` can have multiple elements, but this 
method ends up actually reading all the values from the first element of it. 
Could you explain why that happens? Are there sometimes multiple ReStreams 
representing different things?


This is an automated message from 

[GitHub] [beam] chrisgorgo commented on issue #11204: [BEAM-9579] Fix numpy logic operators

2020-03-23 Thread GitBox
chrisgorgo commented on issue #11204: [BEAM-9579] Fix numpy logic operators
URL: https://github.com/apache/beam/pull/11204#issuecomment-603002452
 
 
   R: @aaltay @charlesccychen


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chrisgorgo opened a new pull request #11204: [BEAM-9579] Fix numpy logic operators

2020-03-23 Thread GitBox
chrisgorgo opened a new pull request #11204: [BEAM-9579] Fix numpy logic 
operators
URL: https://github.com/apache/beam/pull/11204
 
 
   Replacing deprecated '-' logical operators.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 

[GitHub] [beam] robertwb opened a new pull request #11203: [BEAM-9577] Define and implement dependency-aware artifact staging service.

2020-03-23 Thread GitBox
robertwb opened a new pull request #11203: [BEAM-9577] Define and implement 
dependency-aware artifact staging service.
URL: https://github.com/apache/beam/pull/11203
 
 
   This is not yet used anywhere (and even the legacy service is not yet used 
in Dataflow) but it is good to define what we want this service to look like.
   
   R: @lukecwik 
   CC: @ihji 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[beam] branch master updated (913c9f8 -> fc6cef9)

2020-03-23 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 913c9f8  Merge pull request #11198 from [BEAM-7923] Obfuscates display 
ids
 add fc6cef9  Merge pull request #11074: Store logical type values in Row 
instead of base values

No new revisions were added by this update.

Summary of changes:
 .../apache/beam/sdk/coders/RowCoderGenerator.java  | 219 ---
 .../beam/sdk/schemas/FromRowUsingCreator.java  |  15 +-
 .../org/apache/beam/sdk/schemas/SchemaCoder.java   |  60 +--
 .../beam/sdk/schemas/SchemaCoderHelpers.java   | 168 +
 .../sdk/schemas/logicaltypes/EnumerationType.java  |  30 +-
 .../beam/sdk/schemas/logicaltypes/OneOfType.java   |  35 +-
 .../apache/beam/sdk/schemas/transforms/Group.java  | 252 +++--
 .../sdk/schemas/transforms/SchemaAggregateFn.java  |  48 ++-
 .../apache/beam/sdk/schemas/utils/AvroUtils.java   |   3 +-
 .../beam/sdk/schemas/utils/ByteBuddyUtils.java |  27 +-
 .../beam/sdk/schemas/utils/ConvertHelpers.java |   3 +-
 .../apache/beam/sdk/schemas/utils/POJOUtils.java   |   2 +-
 .../main/java/org/apache/beam/sdk/values/Row.java  |  61 +++-
 .../org/apache/beam/sdk/values/RowWithGetters.java |   9 +-
 .../apache/beam/sdk/values/SchemaVerification.java |   3 +-
 .../org/apache/beam/sdk/coders/RowCoderTest.java   |  70 
 .../apache/beam/sdk/schemas/AvroSchemaTest.java|   2 +-
 .../beam/sdk/schemas/JavaFieldSchemaTest.java  |  57 ++-
 .../sdk/schemas/logicaltypes/LogicalTypesTest.java |  12 +-
 .../beam/sdk/schemas/transforms/GroupTest.java | 406 ++---
 .../beam/sdk/schemas/utils/SchemaTestUtils.java| 138 ++-
 .../apache/beam/sdk/schemas/utils/TestPOJOs.java   |  13 +-
 .../protobuf/ProtoDynamicMessageSchema.java|  19 +-
 .../sql/impl/rel/BeamAggregationRel.java   |  17 +-
 .../sdk/extensions/sql/impl/rel/BeamCalcRel.java   |  78 ++--
 .../sql/impl/rel/BeamEnumerableConverter.java  |   2 +-
 .../sdk/extensions/sql/impl/rel/BeamSortRel.java   |   4 +-
 .../extensions/sql/impl/rel/BeamUncollectRel.java  |   2 +-
 .../sdk/extensions/sql/impl/rel/BeamUnnestRel.java |   6 +-
 .../extensions/sql/impl/schema/BeamTableUtils.java |   2 +-
 .../sql/impl/transform/BeamJoinTransforms.java |   8 +-
 .../sql/impl/transform/agg/CovarianceFn.java   |   4 +-
 .../sdk/extensions/sql/BeamComplexTypeTest.java|   3 +-
 .../sql/impl/schema/BeamSqlRowCoderTest.java   |   4 +-
 .../extensions/sql/zetasql/BeamZetaSqlCalcRel.java |   2 +-
 .../sdk/extensions/sql/zetasql/ZetaSqlUtils.java   |   4 +-
 36 files changed, 1200 insertions(+), 588 deletions(-)
 create mode 100644 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java



[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

2020-03-23 Thread GitBox
reuvenlax commented on issue #11074: Store logical type values in Row instead 
of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602935452
 
 
   After 8 runs, the only Java Precommit failures have been random flakes (e.g. 
in Flink tests). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] reuvenlax merged pull request #11074: Store logical type values in Row instead of base values

2020-03-23 Thread GitBox
reuvenlax merged pull request #11074: Store logical type values in Row instead 
of base values
URL: https://github.com/apache/beam/pull/11074
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] TheNeuralBit commented on issue #10055: [BEAM-8603] Add Python SqlTransform example script

2020-03-23 Thread GitBox
TheNeuralBit commented on issue #10055: [BEAM-8603] Add Python SqlTransform 
example script
URL: https://github.com/apache/beam/pull/10055#issuecomment-602931763
 
 
   Run XVR_Flink PostCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] TheNeuralBit commented on issue #10055: [BEAM-8603] Add Python SqlTransform example script

2020-03-23 Thread GitBox
TheNeuralBit commented on issue #10055: [BEAM-8603] Add Python SqlTransform 
example script
URL: https://github.com/apache/beam/pull/10055#issuecomment-602931816
 
 
   Run XVR_Spark PostCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] stale[bot] closed pull request #8801: Make temp_location an attribute of the StandardOptions class

2020-03-23 Thread GitBox
stale[bot] closed pull request #8801: Make temp_location an attribute of the 
StandardOptions class
URL: https://github.com/apache/beam/pull/8801
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] stale[bot] commented on issue #8884: Add a PCollectionCache ABC and several file-based implementations

2020-03-23 Thread GitBox
stale[bot] commented on issue #8884: Add a PCollectionCache ABC and several 
file-based implementations
URL: https://github.com/apache/beam/pull/8884#issuecomment-602929552
 
 
   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] stale[bot] closed pull request #8884: Add a PCollectionCache ABC and several file-based implementations

2020-03-23 Thread GitBox
stale[bot] closed pull request #8884: Add a PCollectionCache ABC and several 
file-based implementations
URL: https://github.com/apache/beam/pull/8884
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] stale[bot] commented on issue #8801: Make temp_location an attribute of the StandardOptions class

2020-03-23 Thread GitBox
stale[bot] commented on issue #8801: Make temp_location an attribute of the 
StandardOptions class
URL: https://github.com/apache/beam/pull/8801#issuecomment-602929558
 
 
   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] pabloem opened a new pull request #11202: asdletmedah

2020-03-23 Thread GitBox
pabloem opened a new pull request #11202: asdletmedah
URL: https://github.com/apache/beam/pull/11202
 
 
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 

[GitHub] [beam] pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

2020-03-23 Thread GitBox
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better 
error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r396767924
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
 ##
 @@ -202,14 +207,24 @@ def _emit_from_file(self, fh, tail):
 # The first line at pos = 0 is always the header. Read the line without
 # the new line.
 to_decode = line[:-1]
-if pos == 0:
-  header = TestStreamFileHeader()
-  header.ParseFromString(self._coder.decode(to_decode))
-  yield header
+proto_cls = TestStreamFileHeader if pos == 0 else TestStreamFileRecord
+msg = self._try_parse_as(proto_cls, to_decode)
+if msg:
+  yield msg
 else:
-  record = TestStreamFileRecord()
-  record.ParseFromString(self._coder.decode(to_decode))
-  yield record
+  break
+
+  def _try_parse_as(self, proto_cls, to_decode):
+try:
+  msg = proto_cls()
+  msg.ParseFromString(self._coder.decode(to_decode))
+except DecodeError:
+  _LOGGER.error(
+  'Could not parse as %s. This can indicate that the cache is '
+  'corruputed. Please restart the kernel. '
+  '\nfile: %s \nmessage: %s', proto_cls, self._path, to_decode)
+  msg = None
 
 Review comment:
   Do we just skip? This may mean that the file is corrupted? Should we stop 
consuming (i.e. rethrow the exception)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

2020-03-23 Thread GitBox
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better 
error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r396810340
 
 

 ##
 File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
 ##
 @@ -170,8 +170,13 @@ def run_pipeline(self, pipeline, options):
   user_pipeline)):
 streaming_cache_manager = ie.current_env().cache_manager()
 if streaming_cache_manager:
+
+  def exception_handler(e):
+_LOGGER.error(str(e))
 
 Review comment:
   Same as above. Do we just log and not stop processing?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

2020-03-23 Thread GitBox
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better 
error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r396766986
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
 ##
 @@ -166,13 +169,15 @@ def _wait_until_file_exists(self, timeout_secs=30):
 
 # Wait for up to `timeout_secs` for the file to be available.
 start = time.time()
-path = os.path.join(self._cache_dir, *self._labels)
-while not os.path.exists(path):
+while not os.path.exists(self._path):
   time.sleep(1)
   if time.time() - start > timeout_timestamp_secs:
+from apache_beam.runners.interactive.pipeline_instrument import 
CacheKey
+pcollection_var = CacheKey.from_str(self._labels[-1]).var
 
 Review comment:
   I hadn't stopeed to think that labels are a file name too, huh? I guess the 
final file name is the PCollection variable name? If so, users may name their 
PCollections something that is not supported by the OS? (or maybe not since 
they have to be Python variable names?)
   Anyway this is not for this PR. But just to think about.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] lostluck commented on a change in pull request #11188: [BEAM-3301] Adding restriction trackers and validation.

2020-03-23 Thread GitBox
lostluck commented on a change in pull request #11188: [BEAM-3301] Adding 
restriction trackers and validation.
URL: https://github.com/apache/beam/pull/11188#discussion_r396803249
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/fn_test.go
 ##
 @@ -562,7 +595,13 @@ func (fn *GoodSdf) RestrictionSize(int, RestT) float64 {
return 0
 }
 
-// TODO(BEAM-3301): Add ProcessElement impl. when restriction trackers are in.
+func (fn *GoodSdf) CreateTracker(RestT) *RTrackerT {
+   return {}
+}
+
+func (fn *GoodSdf) ProcessElement(*RTrackerT, int) int {
 
 Review comment:
   What do you think of having ProcessElement actually just have an 
sdf.RTracker value? 
   Having it as the interface simplifies our wrapping approach for dynamic 
splitting, and means the framework can do it all the time, for safety etc.
   
   CreateTracker would still need the actual implementation type, and check 
that it implements sdf.RTracker of course.
   
   We can always extend things to allow a user to "unwrap" the interface if 
they need direct access to their RTracker implementation for whatever reason.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] lostluck commented on a change in pull request #11188: [BEAM-3301] Adding restriction trackers and validation.

2020-03-23 Thread GitBox
lostluck commented on a change in pull request #11188: [BEAM-3301] Adding 
restriction trackers and validation.
URL: https://github.com/apache/beam/pull/11188#discussion_r396795769
 
 

 ##
 File path: sdks/go/pkg/beam/core/graph/fn_test.go
 ##
 @@ -676,39 +737,77 @@ func (fn *BadSdfElementTRestSize) 
RestrictionSize(float32, RestT) float64 {
 type BadRestT struct{}
 
 type BadSdfRestTSplitRestParam struct {
-   *GoodDoFn
+   *GoodSdf
 }
 
 func (fn *BadSdfRestTSplitRestParam) SplitRestriction(int, BadRestT) []RestT {
return []RestT{}
 }
 
 type BadSdfRestTSplitRestReturn struct {
-   *GoodDoFn
+   *GoodSdf
 }
 
 func (fn *BadSdfRestTSplitRestReturn) SplitRestriction(int, RestT) []BadRestT {
return []BadRestT{}
 }
 
 type BadSdfRestTRestSize struct {
-   *GoodDoFn
+   *GoodSdf
 }
 
 func (fn *BadSdfRestTRestSize) RestrictionSize(int, BadRestT) float64 {
return 0
 }
 
+type BadSdfRestTCreateTracker struct {
+   *GoodSdf
+}
+
+func (fn *BadSdfRestTCreateTracker) CreateTracker(BadRestT) *RTrackerT {
+   return {}
+}
+
 // Examples of other type validation that needs to be done.
 
 type BadSdfRestSizeReturn struct {
-   *GoodDoFn
+   *GoodSdf
 }
 
 func (fn *BadSdfRestSizeReturn) BadSdfRestSizeReturn(int, RestT) int {
return 0
 }
 
+type BadRTrackerT struct{}
 
 Review comment:
   Consider commenting that this "RTracker" isn't implementing the RTracker 
interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] lostluck commented on a change in pull request #11188: [BEAM-3301] Adding restriction trackers and validation.

2020-03-23 Thread GitBox
lostluck commented on a change in pull request #11188: [BEAM-3301] Adding 
restriction trackers and validation.
URL: https://github.com/apache/beam/pull/11188#discussion_r396804143
 
 

 ##
 File path: sdks/go/pkg/beam/core/sdf/sdf.go
 ##
 @@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package sdf is experimental, incomplete, and not yet meant for general 
usage.
+package sdf
+
+// RTracker is an interface used to interact with restrictions while 
processing elements in
+// SplittableDoFns. Each implementation of RTracker is expected to be used for 
tracking a single
+// restriction type, which is the type that should be used to create the 
RTracker, and output by
+// TrySplit.
+type RTracker interface {
+   // TryClaim attempts to claim the block of work in the current 
restriction located at a given
+   // position. This method must be used in the ProcessElement method of 
Splittable DoFns to claim
+   // work before performing it. If no work is claimed, the ProcessElement 
is not allowed to perform
+   // work or emit outputs. If the claim is successful, the DoFn must 
process the entire block. If
+   // the claim is unsuccessful the ProcessElement method of the DoFn must 
return without performing
+   // any additional work or emitting any outputs.
+   //
+   // TryClaim accepts an arbitrary value that can be interpreted as the 
position of a block, and
+   // returns a boolean indicating whether the claim succeeded.
+   //
+   // If the claim fails due to an error, that error can be retrieved with 
GetError.
+   //
+   // For SDFs to work properly, claims must always be monotonically 
increasing in reference to the
+   // restriction's start and end points, and every block of work in a 
restriction must be claimed.
+   //
+   // This pseudocode example illustrates the typical usage of TryClaim:
+   //
+   //  pos = position of first block after restriction.start
+   //  for TryClaim(pos) == true {
+   //  // Do all work in the claimed block and emit outputs.
+   //  pos = position of next block
+   //  }
+   //  return
+   TryClaim(pos interface{}) (ok bool)
+
+   // GetError returns the error that made this RTracker stop executing, 
and it returns null if no
 
 Review comment:
   returns nil*


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp

2020-03-23 Thread GitBox
mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold 
for timer output timestamp
URL: https://github.com/apache/beam/pull/11200#issuecomment-602890165
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] TheNeuralBit commented on a change in pull request #10529: [BEAM-9044] Protobuf options to Schema options

2020-03-23 Thread GitBox
TheNeuralBit commented on a change in pull request #10529: [BEAM-9044] Protobuf 
options to Schema options
URL: https://github.com/apache/beam/pull/10529#discussion_r396791822
 
 

 ##
 File path: 
sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java
 ##
 @@ -205,10 +205,12 @@ static Schema getSchema(Descriptors.Descriptor 
descriptor) {
 // Store proto field number in metadata.
 FieldType fieldType =
 withMetaData(beamFieldTypeFromProtoField(fieldDescriptor), 
fieldDescriptor);
 
 Review comment:
   I think Reuven was referring to the metadata that's added in `withMetaData`, 
since that's the "special" proto metadata


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

2020-03-23 Thread GitBox
reuvenlax commented on issue #11074: Store logical type values in Row instead 
of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602887176
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] rohdesamuel opened a new pull request #11201: test

2020-03-23 Thread GitBox
rohdesamuel opened a new pull request #11201: test
URL: https://github.com/apache/beam/pull/11201
 
 
   Change-Id: I18a1a3c4589e1fdc90d919a223ef67465f495374
   
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding to V2

2020-03-23 Thread GitBox
boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding to V2
URL: https://github.com/apache/beam/pull/11199#issuecomment-602876259
 
 
   I'll update the coder implementation once the proto looks good.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] pabloem merged pull request #11198: [BEAM-7923] Obfuscates display ids

2020-03-23 Thread GitBox
pabloem merged pull request #11198: [BEAM-7923] Obfuscates display ids
URL: https://github.com/apache/beam/pull/11198
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[beam] branch master updated (8083c8f -> 913c9f8)

2020-03-23 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 8083c8f  [BEAM-9305] Allow value provider query strings in 
_CustomBigQuerySource (#11040)
 add 913c9f8  Merge pull request #11198 from [BEAM-7923] Obfuscates display 
ids

No new revisions were added by this update.

Summary of changes:
 .../runners/interactive/display/pcoll_visualization.py | 10 +-
 sdks/python/apache_beam/runners/interactive/utils.py   | 10 ++
 2 files changed, 15 insertions(+), 5 deletions(-)



[GitHub] [beam] mxm removed a comment on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp

2020-03-23 Thread GitBox
mxm removed a comment on issue #11200: [BEAM-9573] Correct computing of 
watermark hold for timer output timestamp
URL: https://github.com/apache/beam/pull/11200#issuecomment-602869001
 
 
   Flink Runner Nexmark Tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp

2020-03-23 Thread GitBox
mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold 
for timer output timestamp
URL: https://github.com/apache/beam/pull/11200#issuecomment-602869187
 
 
   Run Flink Runner Nexmark Tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp

2020-03-23 Thread GitBox
mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold 
for timer output timestamp
URL: https://github.com/apache/beam/pull/11200#issuecomment-602869001
 
 
   Flink Runner Nexmark Tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp

2020-03-23 Thread GitBox
mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold 
for timer output timestamp
URL: https://github.com/apache/beam/pull/11200#issuecomment-602866857
 
 
   Run Java Flink PortableValidatesRunner Streaming


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp

2020-03-23 Thread GitBox
mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold 
for timer output timestamp
URL: https://github.com/apache/beam/pull/11200#issuecomment-602866787
 
 
   Run Flink ValidatesRunner


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] mxm opened a new pull request #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp

2020-03-23 Thread GitBox
mxm opened a new pull request #11200: [BEAM-9573] Correct computing of 
watermark hold for timer output timestamp
URL: https://github.com/apache/beam/pull/11200
 
 
   This PR contains two related changes which would be hard to review in 
separate PRs:
   
   ### BEAM-9573 Correct computing of watermark hold for timer output timestamp
   
   With the introduction of timer output timestamps, a new watermark hold had 
been
   added to the Flink Runner. The watermark computation works on the keyed state
   backend which computes a key-scoped watermark hold and not the desired
   operator-wide watermark hold.
   
   Computation: 
https://github.com/apache/beam/blob/b564239081e9351c56fb0e7d263495b95dd3f8f3/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1140
   
   Key-scoped state: 
https://github.com/apache/beam/blob/b564239081e9351c56fb0e7d263495b95dd3f8f3/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1130
   
   The solution is to iterate over all available state backend keys.
   
   ### BEAM-9566 Mitigate performance issue with watermark hold computation
   
   Benchmarks have shown that the watermark computation over all keys is very
   expensive. This introduces a cache which stores and updates the lowest
   timestamps for watermark holds due to timer output timestamps.
   
   In most cases only the cache should be hit, only when a large number of 
timers
   are added and then removed one-after-another, the cache might have to be
   reloaded with data from the state backend.
   
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 

[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

2020-03-23 Thread GitBox
reuvenlax commented on issue #11074: Store logical type values in Row instead 
of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602864720
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#issuecomment-602862121
 
 
   Post commits run and pass which is a good sign! 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chamikaramj commented on issue #11039: [BEAM-9383] Staging Dataflow artifacts from environment

2020-03-23 Thread GitBox
chamikaramj commented on issue #11039: [BEAM-9383] Staging Dataflow artifacts 
from environment
URL: https://github.com/apache/beam/pull/11039#issuecomment-602855911
 
 
   Retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chamikaramj commented on issue #11185: [BEAM-8019] Some generalizations to support cross-language transforms.

2020-03-23 Thread GitBox
chamikaramj commented on issue #11185: [BEAM-8019] Some generalizations to 
support cross-language transforms.
URL: https://github.com/apache/beam/pull/11185#issuecomment-602855159
 
 
   All tests pass now. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] aaltay commented on issue #11174: [BEAM-7923] Pop failed transform when error is raised

2020-03-23 Thread GitBox
aaltay commented on issue #11174: [BEAM-7923] Pop failed transform when error 
is raised
URL: https://github.com/apache/beam/pull/11174#issuecomment-602854440
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a streaming wordcount integration test

2020-03-23 Thread GitBox
robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a 
streaming wordcount integration test
URL: https://github.com/apache/beam/pull/11148#discussion_r396752081
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
 ##
 @@ -147,6 +150,97 @@ def process(self, element):
 ]
 self.assertEqual(actual_reified, expected_reified)
 
+  def test_streaming_wordcount(self):
+class WordExtractingDoFn(beam.DoFn):
+  def process(self, element):
+text_line = element.strip()
+words = text_line.split()
+return words
+
+# Add the TestStream so that it can be cached.
+ib.options.capturable_sources.add(TestStream)
+ib.options.capture_duration = timedelta(seconds=1)
 
 Review comment:
   Why is this need?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a streaming wordcount integration test

2020-03-23 Thread GitBox
robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a 
streaming wordcount integration test
URL: https://github.com/apache/beam/pull/11148#discussion_r396751796
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
 ##
 @@ -147,6 +150,97 @@ def process(self, element):
 ]
 self.assertEqual(actual_reified, expected_reified)
 
+  def test_streaming_wordcount(self):
+class WordExtractingDoFn(beam.DoFn):
+  def process(self, element):
+text_line = element.strip()
+words = text_line.split()
+return words
+
+# Add the TestStream so that it can be cached.
+ib.options.capturable_sources.add(TestStream)
+ib.options.capture_duration = timedelta(seconds=1)
+
+p = beam.Pipeline(
+runner=interactive_runner.InteractiveRunner(),
+options=StandardOptions(streaming=True))
+
+data = (
+p
+| TestStream()
+.advance_watermark_to(0)
+.advance_processing_time(1)
+.add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
+.advance_watermark_to(20)
+.advance_processing_time(1)
+.add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
+.advance_watermark_to(40)
+.advance_processing_time(1)
 
 Review comment:
   Does this not trigger the capture duration? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a streaming wordcount integration test

2020-03-23 Thread GitBox
robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a 
streaming wordcount integration test
URL: https://github.com/apache/beam/pull/11148#discussion_r396752506
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
 ##
 @@ -147,6 +150,97 @@ def process(self, element):
 ]
 self.assertEqual(actual_reified, expected_reified)
 
+  def test_streaming_wordcount(self):
+class WordExtractingDoFn(beam.DoFn):
+  def process(self, element):
+text_line = element.strip()
+words = text_line.split()
+return words
+
+# Add the TestStream so that it can be cached.
+ib.options.capturable_sources.add(TestStream)
+ib.options.capture_duration = timedelta(seconds=1)
+
+p = beam.Pipeline(
+runner=interactive_runner.InteractiveRunner(),
+options=StandardOptions(streaming=True))
+
+data = (
+p
+| TestStream()
+.advance_watermark_to(0)
+.advance_processing_time(1)
+.add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
+.advance_watermark_to(20)
+.advance_processing_time(1)
+.add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
+.advance_watermark_to(40)
+.advance_processing_time(1)
+.add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
+| beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable
+
+counts = (
+data
+| 'split' >> beam.ParDo(WordExtractingDoFn())
+| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+| 'group' >> beam.GroupByKey()
+| 'count' >> beam.Map(lambda wordones: (wordones[0], 
sum(wordones[1]
+
+# Watch the local scope for Interactive Beam so that referenced 
PCollections
+# will be cached.
+ib.watch(locals())
+
+# This is normally done in the interactive_utils when a transform is
+# applied but needs an IPython environment. So we manually run this here.
+ie.current_env().track_user_pipelines()
+
+# This tests that the data was correctly cached.
+pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
+expected_data_df = pd.DataFrame(
+[('to', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
 
 Review comment:
   It'd be easier to understand the test if there were less data. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] KevinGG commented on issue #11174: [BEAM-7923] Pop failed transform when error is raised

2020-03-23 Thread GitBox
KevinGG commented on issue #11174: [BEAM-7923] Pop failed transform when error 
is raised
URL: https://github.com/apache/beam/pull/11174#issuecomment-602852406
 
 
   Fixed a failed test.
   
   TL;DR, the test was wrong and passed in the past because it got lucky.
   
   The test used to pass and started failing with the try-append-finally-pop 
change, because:
   1. The test tried to append two no name / label transforms into a same 
pipeline, both of them will generate the same label and raise errors while the 
tests asserted for errors, thus allowing pipeline construction to continue even 
if it ran into fatal errors;
   2. In the past, the pipeline was in a broken state where the 
current_transform was not popped when the 1st error was raised. The full label 
generated was `WriteToBigQuery`;
   3. In the past, the second transform appending wrongly added parts into the 
broken current_transform `WriteToBigQuery`, causing the transform node to be 
appended after a wrong failed parent node and luckily got a unique transform 
full label `WriteToBigQuery/WriteToBigQuery`, thus the test didn't fail due to 
duplicated labels. But the pipeline was constructed as 
`WriteToBigQuery`->`WriteToBigQuery` which was completely messed up;
   4. Still the test didn't fail because it was testing for errors even though 
it built a broken pipeline with broken usages.
   
   Once the try-append-finally-pop change is applied, the test functions as:
   1. First transform still fails to be appended and has side-effects including 
an applied transform label, but it would not leave the pipeline in a broken 
state where current transform is still the right node;
   2. Second transform still fails but now appended to the correct node;
   3. As long as the 2 transforms have different labels (or executed in 
different cells if in an interactive environment), the test still passes, 
pipeline is not broken and can be used for future development, side-effects are 
ruled out when data-centric APIs such as `show` and `collect` are invoked.
   
   The reason we keep the applied label is that we never know what side effects 
are when the error is raised.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#issuecomment-602851450
 
 
   Run Go Postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] aaltay commented on issue #11174: [BEAM-7923] Pop failed transform when error is raised

2020-03-23 Thread GitBox
aaltay commented on issue #11174: [BEAM-7923] Pop failed transform when error 
is raised
URL: https://github.com/apache/beam/pull/11174#issuecomment-602851060
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] boyuanzz commented on a change in pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state

2020-03-23 Thread GitBox
boyuanzz commented on a change in pull request #11060: [BEAM-9454] Create 
Deduplication transform based on user timer/state
URL: https://github.com/apache/beam/pull/11060#discussion_r396747769
 
 

 ##
 File path: sdks/python/apache_beam/transforms/deduplicate.py
 ##
 @@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+"""a collection of ptransforms for deduplicating elements."""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import typing
+
+from apache_beam import typehints
+from apache_beam.coders.coders import BooleanCoder
+from apache_beam.transforms import core
+from apache_beam.transforms import ptransform
+from apache_beam.transforms import userstate
+from apache_beam.transforms.timeutil import TimeDomain
+from apache_beam.utils import timestamp
+
+__all__ = [
+'Deduplicate',
+'DeduplicatePerKey',
+]
+
+K = typing.TypeVar('K')
+V = typing.TypeVar('V')
+
+
+@typehints.with_input_types(typing.Tuple[K, V])
+@typehints.with_output_types(typing.Tuple[K, V])
+class DeduplicatePerKey(ptransform.PTransform):
+  """ A PTransform which deduplicates  pair over a time domain and
+  threshold. Values in different windows will NOT be considered duplicates of
+  each other. Deduplication is best effort.
+
+  The durations specified may impose memory and/or storage requirements within
+  a runner and care might need to be used to ensure that the deduplication time
+  limit is long enough to remove duplicates but short enough to not cause
+  performance problems within a runner. Each runner may provide an optimized
+  implementation of their choice using the deduplication time domain and
+  threshold specified.
+
+  Does not preserve any order the input PCollection might have had.
+  """
+  def __init__(self, processing_time_duration=None, event_time_duration=None):
+if processing_time_duration is None and event_time_duration is None:
+  raise ValueError(
+  'DeduplicatePerKey requires at lease provide either'
+  'processing_time_duration or event_time_duration.')
+self.processing_time_duration = processing_time_duration
+self.event_time_duration = event_time_duration
+
+  def _create_deduplicate_fn(self):
+processing_timer_spec = userstate.TimerSpec(
+'processing_timer', TimeDomain.REAL_TIME)
+event_timer_spec = userstate.TimerSpec('event_timer', TimeDomain.WATERMARK)
+state_spec = userstate.BagStateSpec('seen', BooleanCoder())
 
 Review comment:
   The `seen_state` is only set once per key during that duration. I'm not sure 
whether combining state is more suitable here. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] pabloem commented on issue #11040: [BEAM-9305] Allow value provider query strings in _CustomBigQuerySource

2020-03-23 Thread GitBox
pabloem commented on issue #11040: [BEAM-9305] Allow value provider query 
strings in _CustomBigQuerySource
URL: https://github.com/apache/beam/pull/11040#issuecomment-602847544
 
 
   Thanks @EDjur for the contribution! Thanks @kamilwu for reviewing


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[beam] branch master updated (ec8846b -> 8083c8f)

2020-03-23 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from ec8846b  optionally import grpc (#11187)
 add 8083c8f  [BEAM-9305] Allow value provider query strings in 
_CustomBigQuerySource (#11040)

No new revisions were added by this update.

Summary of changes:
 CHANGES.md |  2 ++
 sdks/python/apache_beam/io/gcp/bigquery.py | 22 ++
 .../apache_beam/io/gcp/bigquery_read_it_test.py|  6 --
 3 files changed, 20 insertions(+), 10 deletions(-)



[GitHub] [beam] pabloem merged pull request #11040: [BEAM-9305] Allow value provider query strings in _CustomBigQuerySource

2020-03-23 Thread GitBox
pabloem merged pull request #11040: [BEAM-9305] Allow value provider query 
strings in _CustomBigQuerySource
URL: https://github.com/apache/beam/pull/11040
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[beam] branch master updated (1c35224 -> ec8846b)

2020-03-23 Thread hannahjiang
This is an automated email from the ASF dual-hosted git repository.

hannahjiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 1c35224  Merge pull request #11192 from lukecwik/splittabledofn
 add ec8846b  optionally import grpc (#11187)

No new revisions were added by this update.

Summary of changes:
 .../apache_beam/runners/direct/test_stream_impl.py   | 16 +---
 1 file changed, 13 insertions(+), 3 deletions(-)



[GitHub] [beam] Hannah-Jiang merged pull request #11187: optionally import grpc

2020-03-23 Thread GitBox
Hannah-Jiang merged pull request #11187: optionally import grpc
URL: https://github.com/apache/beam/pull/11187
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] KevinGG commented on a change in pull request #11174: [BEAM-7923] Pop failed transform when error is raised

2020-03-23 Thread GitBox
KevinGG commented on a change in pull request #11174: [BEAM-7923] Pop failed 
transform when error is raised
URL: https://github.com/apache/beam/pull/11174#discussion_r396742548
 
 

 ##
 File path: sdks/python/apache_beam/pipeline.py
 ##
 @@ -307,58 +307,61 @@ def _replace_if_needed(self, original_transform_node):
   elif len(inputs) == 0:
 input_node = pvalue.PBegin(self.pipeline)
 
-  # We have to add the new AppliedTransform to the stack before 
expand()
-  # and pop it out later to make sure that parts get added correctly.
-  self.pipeline.transforms_stack.append(replacement_transform_node)
-
-  # Keeping the same label for the replaced node but recursively
-  # removing labels of child transforms of original transform since 
they
-  # will be replaced during the expand below. This is needed in case
-  # the replacement contains children that have labels that conflicts
-  # with labels of the children of the original.
-  self.pipeline._remove_labels_recursively(original_transform_node)
-
-  new_output = replacement_transform.expand(input_node)
-  assert isinstance(
-  new_output, (dict, pvalue.PValue, pvalue.DoOutputsTuple))
-
-  if isinstance(new_output, pvalue.PValue):
-new_output.element_type = None
-self.pipeline._infer_result_type(
-replacement_transform, inputs, new_output)
-
-  if isinstance(new_output, dict):
-for new_tag, new_pcoll in new_output.items():
-  replacement_transform_node.add_output(new_pcoll, new_tag)
-  elif isinstance(new_output, pvalue.DoOutputsTuple):
-replacement_transform_node.add_output(
-new_output, new_output._main_tag)
-  else:
-replacement_transform_node.add_output(new_output, new_output.tag)
-
-  # Recording updated outputs. This cannot be done in the same visitor
-  # since if we dynamically update output type here, we'll run into
-  # errors when visiting child nodes.
-  #
-  # NOTE: When replacing multiple outputs, the replacement PCollection
-  # tags must have a matching tag in the original transform.
-  if isinstance(new_output, pvalue.PValue):
-if not new_output.producer:
-  new_output.producer = replacement_transform_node
-output_map[original_transform_node.outputs[new_output.tag]] = \
-new_output
-  elif isinstance(new_output, (pvalue.DoOutputsTuple, tuple)):
-for pcoll in new_output:
-  if not pcoll.producer:
-pcoll.producer = replacement_transform_node
-  output_map[original_transform_node.outputs[pcoll.tag]] = pcoll
-  elif isinstance(new_output, dict):
-for tag, pcoll in new_output.items():
-  if not pcoll.producer:
-pcoll.producer = replacement_transform_node
-  output_map[original_transform_node.outputs[tag]] = pcoll
-
-  self.pipeline.transforms_stack.pop()
+  try:
+# We have to add the new AppliedTransform to the stack before
+# expand() and pop it out later to make sure that parts get added
+# correctly.
+self.pipeline.transforms_stack.append(replacement_transform_node)
 
 Review comment:
   It wouldn't raise an error if it's still a list on the line, and if it's not 
a list (becomes a None or some other object) at the moment it causes an error, 
the finally block would error out too. So it's not necessary to exclude it from 
the try block.
   
   Putting it inside the try makes it a little bit more self-explained.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] boyuanzz commented on a change in pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state

2020-03-23 Thread GitBox
boyuanzz commented on a change in pull request #11060: [BEAM-9454] Create 
Deduplication transform based on user timer/state
URL: https://github.com/apache/beam/pull/11060#discussion_r396742097
 
 

 ##
 File path: sdks/python/apache_beam/transforms/deduplicate_test.py
 ##
 @@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+"""Unit tests for deduplicate transform by using TestStream."""
+
+from __future__ import absolute_import
+
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.coders import coders
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.testing.util import equal_to_per_window
+from apache_beam.transforms import deduplicate
+from apache_beam.transforms import window
+from apache_beam.utils.timestamp import Duration
+from apache_beam.utils.timestamp import Timestamp
+
+
+# TestStream is only supported in streaming pipeline. The Deduplicate transform
+# also requires Timer support. Sickbaying this testsuite until dataflow runner
+# supports both TestStream and user timer.
+@attr('ValidatesRunner', 'sickbay-batch', 'sickbay-streaming')
 
 Review comment:
   The  'sickbay-batch' and 'sickbay-streaming' is only used by dataflow suite 
now. And unfortunately, I don't we have runners supporting these python test 
now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] youngoli commented on issue #11188: [BEAM-3301] Adding restriction trackers and validation.

2020-03-23 Thread GitBox
youngoli commented on issue #11188: [BEAM-3301] Adding restriction trackers and 
validation.
URL: https://github.com/apache/beam/pull/11188#issuecomment-602838126
 
 
   Whoops, forgot reviewers.
   R: @lostluck 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] pabloem commented on issue #11198: [BEAM-7923] Obfuscates display ids

2020-03-23 Thread GitBox
pabloem commented on issue #11198: [BEAM-7923] Obfuscates display ids
URL: https://github.com/apache/beam/pull/11198#issuecomment-602837778
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#issuecomment-602835398
 
 
   Retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] pabloem commented on issue #10291: [BEAM-7516][BEAM-8823] FnApiRunner works with work queues, and a primitive watermark manager

2020-03-23 Thread GitBox
pabloem commented on issue #10291: [BEAM-7516][BEAM-8823] FnApiRunner works 
with work queues, and a primitive watermark manager
URL: https://github.com/apache/beam/pull/10291#issuecomment-602830331
 
 
   I can't reproduce any of the failures locally via `tox -e 
py35-cloud,py35-cython`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] boyuanzz opened a new pull request #11199: [BEAM-9562] Update Timer encoding to V2

2020-03-23 Thread GitBox
boyuanzz opened a new pull request #11199: [BEAM-9562] Update Timer encoding to 
V2
URL: https://github.com/apache/beam/pull/11199
 
 
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 

[GitHub] [beam] rohdesamuel commented on issue #11148: [BEAM-8335] Adds a streaming wordcount integration test

2020-03-23 Thread GitBox
rohdesamuel commented on issue #11148: [BEAM-8335] Adds a streaming wordcount 
integration test
URL: https://github.com/apache/beam/pull/11148#issuecomment-602825559
 
 
   R: @robertwb 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values

2020-03-23 Thread GitBox
reuvenlax commented on issue #11074: Store logical type values in Row instead 
of base values
URL: https://github.com/apache/beam/pull/11074#issuecomment-602824816
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is missing a test

2020-03-23 Thread GitBox
udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is 
missing a test
URL: https://github.com/apache/beam/pull/10914#issuecomment-602823166
 
 
   Trying again


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is missing a test

2020-03-23 Thread GitBox
udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is 
missing a test
URL: https://github.com/apache/beam/pull/10914#issuecomment-602823472
 
 
   run python 3.7 postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chamikaramj commented on issue #11039: [BEAM-9383] Staging Dataflow artifacts from environment

2020-03-23 Thread GitBox
chamikaramj commented on issue #11039: [BEAM-9383] Staging Dataflow artifacts 
from environment
URL: https://github.com/apache/beam/pull/11039#issuecomment-602822540
 
 
   Retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chamikaramj commented on issue #11185: [BEAM-8019] Some generalizations to support cross-language transforms.

2020-03-23 Thread GitBox
chamikaramj commented on issue #11185: [BEAM-8019] Some generalizations to 
support cross-language transforms.
URL: https://github.com/apache/beam/pull/11185#issuecomment-602821407
 
 
   Run Python2_PVR_Flink PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] davidyan74 commented on a change in pull request #11198: [BEAM-7923] Obfuscates display ids

2020-03-23 Thread GitBox
davidyan74 commented on a change in pull request #11198: [BEAM-7923] Obfuscates 
display ids
URL: https://github.com/apache/beam/pull/11198#discussion_r396717186
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
 ##
 @@ -246,11 +247,11 @@ def __init__(self, pcoll, include_window_info=False, 
display_facets=False):
 if not self._pcoll_var:
   self._pcoll_var = 'Value'
 self._cache_key = self._pin.cache_key(self._pcoll)
-self._dive_display_id = 'facets_dive_{}_{}'.format(
-self._cache_key, id(self))
-self._overview_display_id = 'facets_overview_{}_{}'.format(
-self._cache_key, id(self))
-self._df_display_id = 'df_{}_{}'.format(self._cache_key, id(self))
+self._dive_display_id = 'facets_dive_{}'.format(
+obfuscate(self._cache_key, id(self)))
 
 Review comment:
   nit: assign the obfuscate return value to a variable so it can be reused.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] pabloem commented on issue #11198: [BEAM-7923] Obfuscates display ids

2020-03-23 Thread GitBox
pabloem commented on issue #11198: [BEAM-7923] Obfuscates display ids
URL: https://github.com/apache/beam/pull/11198#issuecomment-602819811
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#issuecomment-602817069
 
 
   I'm definitely not merging this until both the PostCommit runs, and someone 
more familiar with windowing/trigger semantics looks over the configuration I 
copied over from python:
   
https://github.com/apache/beam/pull/11197/files#diff-ef420fdb9afbce0674282b4ed4481042R530


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] KevinGG opened a new pull request #11198: [BEAM-7923] Obfuscates display ids

2020-03-23 Thread GitBox
KevinGG opened a new pull request #11198: [BEAM-7923] Obfuscates display ids
URL: https://github.com/apache/beam/pull/11198
 
 
   1. Use md5 to hash and digest any inputs into a hexadecimal string.
   2. The obfuscation is applied to all display ids in notebooks. Note the
   ids will start with alphabets such as `facets_dive_` or `table_df_` to
   be compatible with `document.querySelector()`. Otherwise, hexadecimal
   strings are compatible with `jQuery()` already.
   3. The performance and hash clashing research can be found here:
   https://www.peterbe.com/plog/best-hashing-function-in-python.
   
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 

[GitHub] [beam] KevinGG commented on issue #11198: [BEAM-7923] Obfuscates display ids

2020-03-23 Thread GitBox
KevinGG commented on issue #11198: [BEAM-7923] Obfuscates display ids
URL: https://github.com/apache/beam/pull/11198#issuecomment-602817070
 
 
   yapf formatted.
   Lint passed locally.
   
   R: @pabloem 
   R: @davidyan74 
   R: @rohdesamuel 
   
   PTAL, thx!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] lukecwik merged pull request #11192: [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change

2020-03-23 Thread GitBox
lukecwik merged pull request #11192: [BEAM-9430] Fix coder sent to Dataflow 
service for non-portable pipelines due to WatermarkEstimators migration change
URL: https://github.com/apache/beam/pull/11192
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[beam] branch master updated: [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change

2020-03-23 Thread lcwik
This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 6912011  [BEAM-9430] Fix coder sent to Dataflow service for 
non-portable pipelines due to WatermarkEstimators migration change
 new 1c35224  Merge pull request #11192 from lukecwik/splittabledofn
6912011 is described below

commit 6912011bd7b9745d5a3fd195165482bcc39ffa32
Author: Luke Cwik 
AuthorDate: Mon Mar 23 08:24:58 2020 -0700

[BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines 
due to WatermarkEstimators migration change
---
 .../runners/dataflow/DataflowPipelineTranslator.java | 20 ++--
 .../dataflow/DataflowPipelineTranslatorTest.java |  3 ++-
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 5f7ec26..8be785e 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -978,12 +978,16 @@ public class DataflowPipelineTranslator {
 if (context.isFnApi()) {
   DoFnSignature signature = 
DoFnSignatures.signatureForDoFn(transform.getFn());
   if (signature.processElement().isSplittable()) {
-Coder restrictionCoder =
-DoFnInvokers.invokerFor(transform.getFn())
-.invokeGetRestrictionCoder(
-
context.getInput(transform).getPipeline().getCoderRegistry());
+DoFnInvoker doFnInvoker = 
DoFnInvokers.invokerFor(transform.getFn());
+Coder restrictionAndWatermarkStateCoder =
+KvCoder.of(
+doFnInvoker.invokeGetRestrictionCoder(
+
context.getInput(transform).getPipeline().getCoderRegistry()),
+doFnInvoker.invokeGetWatermarkEstimatorStateCoder(
+
context.getInput(transform).getPipeline().getCoderRegistry()));
 stepContext.addInput(
-PropertyNames.RESTRICTION_ENCODING, 
translateCoder(restrictionCoder, context));
+PropertyNames.RESTRICTION_ENCODING,
+translateCoder(restrictionAndWatermarkStateCoder, 
context));
   }
 }
   }
@@ -1190,7 +1194,11 @@ public class DataflowPipelineTranslator {
 
 stepContext.addInput(
 PropertyNames.RESTRICTION_CODER,
-translateCoder(transform.getRestrictionCoder(), context));
+translateCoder(
+KvCoder.of(
+transform.getRestrictionCoder(),
+transform.getWatermarkEstimatorStateCoder()),
+context));
   }
 });
   }
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d48f060..775c782 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -733,7 +733,8 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
 Structs.getObject(
 processKeyedStep.getProperties(), 
PropertyNames.RESTRICTION_CODER));
 
-assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder);
+assertEquals(
+KvCoder.of(SerializableCoder.of(OffsetRange.class), VoidCoder.of()), 
restrictionCoder);
   }
 
   /** Smoke test to fail fast if translation of a splittable ParDo in FnAPI. */



[beam] branch master updated (7310ec2 -> fb59b6a)

2020-03-23 Thread reuvenlax
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 7310ec2  Merge pull request #11190 from lukecwik/beam9565
 add fb59b6a  Merge pull request #10990: [BEAM-9569] disable coder 
inference for rows

No new revisions were added by this update.

Summary of changes:
 .../org/apache/beam/sdk/coders/CoderRegistry.java  |  5 ++
 .../apache/beam/sdk/schemas/transforms/Group.java  | 24 ---
 .../sql/impl/rel/BeamSetOperatorRelBase.java   | 55 +++
 .../impl/transform/BeamSetOperatorsTransforms.java | 78 +-
 .../meta/provider/pubsub/PubsubMessageToRow.java   | 43 ++--
 .../sql/meta/provider/test/TestTableProvider.java  | 16 ++---
 .../beam/sdk/nexmark/queries/sql/SqlQuery0.java|  1 +
 7 files changed, 106 insertions(+), 116 deletions(-)



[GitHub] [beam] reuvenlax merged pull request #10990: [BEAM-9569] disable coder inference for rows

2020-03-23 Thread GitBox
reuvenlax merged pull request #10990: [BEAM-9569] disable coder inference for 
rows
URL: https://github.com/apache/beam/pull/10990
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] ibzib commented on issue #11189: [BEAM-9446] Retain unknown arguments when using uber jar job server.

2020-03-23 Thread GitBox
ibzib commented on issue #11189: [BEAM-9446] Retain unknown arguments when 
using uber jar job server.
URL: https://github.com/apache/beam/pull/11189#issuecomment-602808467
 
 
   @mxm thanks for the feedback, I addressed your comments.
   
   Unrelated: I wasn't aware that the Flink portable jar postcommit has been 
failing. I filed and self-assigned BEAM-9575.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] Hannah-Jiang commented on issue #11067: [BEAM-9136]Add licenses for dependencies

2020-03-23 Thread GitBox
Hannah-Jiang commented on issue #11067: [BEAM-9136]Add licenses for dependencies
URL: https://github.com/apache/beam/pull/11067#issuecomment-602807464
 
 
   R: @alanmyrvold , do you have time to take a look for Go and Java?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes

2020-03-23 Thread GitBox
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing 
fixes
URL: https://github.com/apache/beam/pull/11038#discussion_r396700454
 
 

 ##
 File path: sdks/python/apache_beam/coders/coders.py
 ##
 @@ -387,8 +387,11 @@ def register_structured_urn(urn, cls):
 """Register a coder that's completely defined by its urn and its
 component(s), if any, which are passed to construct the instance.
 """
-cls.to_runner_api_parameter = (
-lambda self, unused_context: (urn, None, self._get_component_coders()))
+setattr(
 
 Review comment:
   mypy raises an error when overwriting a method: `Cannot assign to a method`. 
  using `setattr` avoids the error.  A similar situation arises when setting or 
getting dynamic attributes.  
   
   We need to decide on a policy for these cases:
   
   1) avoid the error using `setattr` and `getattr`
   2) silence the error with a `# type: ignore[xxx]` comment
   
   The second option provides more context, but could hinge on how you feel 
about the aesthetics of `type ignore` comments.
   
   Either way, I agree a comment should be made on the preceding line to 
clarify.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK

2020-03-23 Thread GitBox
lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#issuecomment-602805178
 
 
   R: @youngoli 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread GitBox
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396695008
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_read_it_test.py
 ##
 @@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
 Review comment:
   Is this test working now ? (If so I can try triggering it from the PR).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread GitBox
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396693125
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
+
+def chunks():
+  for i in range(1, len(keys)):
+key_1 = keys[i - 1].row_key
+key_2 = keys[i].row_key
+size = keys[i].offset_bytes - keys[i - 1].offset_bytes
+yield iobase.SourceBundle(size, None, key_1, key_2)
 
 Review comment:
   You don't need to use SourceTestBundle if you are not using Read transform. 
Please use a separate data structure  (or just a tuple) here to avoid confusion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread GitBox
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396691673
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
+
+def chunks():
+  for i in range(1, len(keys)):
 
 Review comment:
   Do we expect to always get at least two keys here ? If so add an assertion 
before this statement so that we don't fall through trivially.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread GitBox
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396690605
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
 
 Review comment:
   Please add a comment to explain why this is needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread GitBox
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396694342
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A PTransform wrapper for parallel reading rows from s Bigtable table.
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row. If noe is provided, all rows are read by default.
+"""
+super(self.__class__, self).__init__()
+self._options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_}
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._options = options
+
+  def expand(self, pbegin):
+table = Client(project=self._options['project_id'], admin=True) \
+  .instance(instance_id=self._options['instance_id']) \
+  .table(table_id=self._options['table_id'])
+
+keys = list(table.sample_row_keys())
+
+SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
+keys.insert(0, SampleRowKey(b'', 0))
+
+def chunks():
+  for i in range(1, len(keys)):
+key_1 = keys[i - 1].row_key
+key_2 = keys[i].row_key
+size = keys[i].offset_bytes - keys[i - 1].offset_bytes
+yield iobase.SourceBundle(size, None, key_1, key_2)
+
+return (pbegin
+| 'Bundles' >> beam.Create(iter(chunks()))
 
 Review comment:
   Please add unit tests to conver splitting/reading etc. See following for 
some example unit tests.
   
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py


This is an automated message from the Apache Git Service.
To respond to the message, please log on 

[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread GitBox
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396695591
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio_read_it_test.py
 ##
 @@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
 Review comment:
   Can you try triggering this for a large dataset to make sure that there's no 
severe perf regression ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python

2020-03-23 Thread GitBox
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a 
Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r396659933
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
 | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
   beam_options['instance_id'],
   beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+""" A DoFn to parallelize reading from a Bigtable table
+
+:type project_id: str
+:param project_id: The ID of the project used for Bigtable access
+
+:type instance_id: str
+:param instance_id: The ID of the instance that owns the table.
+
+:type table_id: str
+:param table_id: The ID of the table.
+
+:type filter_: :class:`.RowFilter`
+:param filter_: (Optional) The filter to apply to the contents of the
+specified row(s). If unset, reads every column in
+each row.
+"""
+super(self.__class__, self).__init__()
+self._initialize({'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id,
+ 'filter_': filter_})
+
+  def _initialize(self, options):
+""" The defaults initializer, to assist with pickling
+
+:return: None
+"""
+self._options = options
+self._table = None
+self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+return self._options
+
+  def __setstate__(self, options):
+self._initialize(options)
+
+  def start_bundle(self):
+# from google.cloud.bigtable import Client
+if self._table is None:
+  # noinspection PyAttributeOutsideInit
+  self._table = Client(project=self._options['project_id'])\
+.instance(self._options['instance_id'])\
+.table(self._options['table_id'])
+
+  def process(self, source_bundle):
+_start_key = source_bundle.start_position
+_end_key = source_bundle.stop_position
+for row in self._table.read_rows(_start_key, _end_key):
+  self._counter.inc()
+  yield row
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self._options['project_id'],
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self._options['instance_id'],
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self._options['table_id'],
+   label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
 
 Review comment:
   Please add docs and annotations to denote that this module and PTransforms 
(both source and sink) are experimental similar to following.
   
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py#L52


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes

2020-03-23 Thread GitBox
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing 
fixes
URL: https://github.com/apache/beam/pull/11038#discussion_r396693751
 
 

 ##
 File path: sdks/python/mypy.ini
 ##
 @@ -17,16 +17,21 @@
 
 [mypy]
 python_version = 3.6
+files = apache_beam
 ignore_missing_imports = true
-follow_imports = true
-warn_no_return = true
 no_implicit_optional = true
+# warnings:
+warn_no_return = true
 warn_redundant_casts = true
 warn_unused_ignores = true
+# formatting:
 show_error_codes = true
-files = apache_beam
 color_output = true
-# uncomment this to see how close we are to being complete
+# required setting for dmypy:
+follow_imports = error
 
 Review comment:
   mypy picks up the site-packages directory of the python that it's installed 
into.  Only PEP 561 compliant packages will inform the type analysis (i.e. the 
package must have a `py.typed` file).  Alternately, you can specify the 
interpreter for finding site-packages with `--python-executable`:
   
   ```
 --python-executable EXECUTABLE
   Python executable used for finding PEP 561
   compliant installed packages and stubs
   ```
   
   This config change did not generate any additional errors for me.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes

2020-03-23 Thread GitBox
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing 
fixes
URL: https://github.com/apache/beam/pull/11038#discussion_r396690657
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1300,12 +1300,13 @@ def to_runner_api_parameter(self, context):
   common_urns.requirements.REQUIRES_STATEFUL_PROCESSING.urn)
 from apache_beam.runners.common import DoFnSignature
 sig = DoFnSignature(self.fn)
-is_splittable = sig.is_splittable_dofn()
 
 Review comment:
   > Not sure if checking get_restriction_coder() return type instead of 
is_splittable_dofn() is future proof.
   
   `get_restriction_coder()` calls `is_splittable_dofn()` and returns `None` if 
it's not splittable.  So I interpreted a `None` result from this method to mean 
"is not splittable". 
   
   ```python
 def get_restriction_coder(self):
   # type: () -> Optional[TupleCoder]
   
   """Get coder for a restriction when processing an SDF. """
   if self.is_splittable_dofn():
 return TupleCoder([
 (self.get_restriction_provider().restriction_coder()),
 (self.get_watermark_estimator_provider().estimator_state_coder())
 ])
   else:
 return None
   ```
   
   > I don't understand the change, from a mypy correctness perspective.
   
   Here's the problem:
   
   ```python
   if is_splittable:
 restriction_coder = sig.get_restriction_coder()  #  returns 
Optional[TupleCoder]
 restriction_coder_id = context.coders.get_id(restriction_coder)  # 
does not accept Optional!
   else:else:
 restriction_coder_id = None
   ```
   
   With my changes, we naturally drop the optionality before passing the value 
to `context.coders.get_id()`.  We also avoid a redundant call to 
`is_splittable_dofn()`, FWIW.
   
   I see two options:
   
   1) keep my changes and update the documentation of `get_restriction_coder()` 
to clarify that `None` result indicates "is not splittable"
   2) revert my changes and add `assert restriction_coder is None` before the 
call to `context.coders.get_id()`
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes

2020-03-23 Thread GitBox
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing 
fixes
URL: https://github.com/apache/beam/pull/11038#discussion_r396690657
 
 

 ##
 File path: sdks/python/apache_beam/transforms/core.py
 ##
 @@ -1300,12 +1300,13 @@ def to_runner_api_parameter(self, context):
   common_urns.requirements.REQUIRES_STATEFUL_PROCESSING.urn)
 from apache_beam.runners.common import DoFnSignature
 sig = DoFnSignature(self.fn)
-is_splittable = sig.is_splittable_dofn()
 
 Review comment:
   > Not sure if checking get_restriction_coder() return type instead of 
is_splittable_dofn() is future proof.
   
   `get_restriction_coder()` calls `is_splittable_dofn()` and returns `None` if 
it's not splittable.  So I interpreted a `None` result from this method to mean 
"is not splittable". 
   
   ```python
 def get_restriction_coder(self):
   # type: () -> Optional[TupleCoder]
   
   """Get coder for a restriction when processing an SDF. """
   if self.is_splittable_dofn():
 return TupleCoder([
 (self.get_restriction_provider().restriction_coder()),
 (self.get_watermark_estimator_provider().estimator_state_coder())
 ])
   else:
 return None
   ```
   
   > I don't understand the change, from a mypy correctness perspective.
   
   Here's the problem:
   
   ```python
   if is_splittable:
 restriction_coder = sig.get_restriction_coder()  #  returns 
Optional[TupleCoder]
 restriction_coder_id = context.coders.get_id(restriction_coder)  # 
does not accept Optional!
   else:
 restriction_coder_id = None
   ```
   
   With my changes, we naturally drop the optionality before passing the value 
to `context.coders.get_id()`.  We also avoid a redundant call to 
`is_splittable_dofn()`, FWIW.
   
   I see two options:
   
   1) keep my changes and update the documentation of `get_restriction_coder()` 
to clarify that `None` result indicates "is not splittable"
   2) revert my changes and add `assert restriction_coder is None` before the 
call to `context.coders.get_id()`
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] ibzib commented on a change in pull request #11189: [BEAM-9446] Retain unknown arguments when using uber jar job server.

2020-03-23 Thread GitBox
ibzib commented on a change in pull request #11189: [BEAM-9446] Retain unknown 
arguments when using uber jar job server.
URL: https://github.com/apache/beam/pull/11189#discussion_r396684486
 
 

 ##
 File path: sdks/python/apache_beam/options/pipeline_options.py
 ##
 @@ -285,10 +289,25 @@ def get_all_options(
   cls._add_argparse_args(parser)  # pylint: disable=protected-access
 if add_extra_args_fn:
   add_extra_args_fn(parser)
+
 known_args, unknown_args = parser.parse_known_args(self._flags)
-if unknown_args:
-  _LOGGER.warning("Discarding unparseable args: %s", unknown_args)
-result = vars(known_args)
+if retain_unknown_options:
+  i = 0
+  while i < len(unknown_args):
+# Treat all unary flags as booleans, and all binary argument values as
+# strings.
+if i + 1 >= len(unknown_args) or unknown_args[i + 1].startswith('--'):
 
 Review comment:
   I guess I should change it to `-`, not `--`. `-` is the default prefix for 
argparse, and AFAICT that is true for Beam as well, even though most (all?) 
pipeline options use `--`.
   https://docs.python.org/3/library/argparse.html#prefix-chars
   
   So then the question becomes "Should we always check a parameter name starts 
with `-`?" And the answer is we don't have to, because the call to parse_args 
will do that for us. `test_retain_unknown_options_unary_missing_prefix` tests 
that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes

2020-03-23 Thread GitBox
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing 
fixes
URL: https://github.com/apache/beam/pull/11038#discussion_r396684644
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
 ##
 @@ -353,6 +359,8 @@ def release(self, instruction_id):
 self.cached_bundle_processors[descriptor_id].append(processor)
 
   def shutdown(self):
+# type: (...) -> None
 
 Review comment:
   correct. looks like I got a little sloppy.   will fix in my next push. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes

2020-03-23 Thread GitBox
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing 
fixes
URL: https://github.com/apache/beam/pull/11038#discussion_r396683616
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/operations.py
 ##
 @@ -569,7 +580,7 @@ def __init__(self,
 self.tagged_receivers = None  # type: Optional[_TaggedReceivers]
 # A mapping of timer tags to the input "PCollections" they come in on.
 self.timer_inputs = timer_inputs or {}
-self.input_info = None  # type: Optional[Tuple[str, str, 
coders.WindowedValueCoder, MutableMapping[str, str]]]
 
 Review comment:
   correct, I discovered it was only iterating over keys (using `for x in y`), 
so it only needed to be `Iterable[str]`. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is missing a test

2020-03-23 Thread GitBox
udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is 
missing a test
URL: https://github.com/apache/beam/pull/10914#issuecomment-602789494
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] robertwb commented on a change in pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state

2020-03-23 Thread GitBox
robertwb commented on a change in pull request #11060: [BEAM-9454] Create 
Deduplication transform based on user timer/state
URL: https://github.com/apache/beam/pull/11060#discussion_r396675954
 
 

 ##
 File path: sdks/python/apache_beam/transforms/deduplicate.py
 ##
 @@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+"""a collection of ptransforms for deduplicating elements."""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import typing
+
+from apache_beam import typehints
+from apache_beam.coders.coders import BooleanCoder
+from apache_beam.transforms import core
+from apache_beam.transforms import ptransform
+from apache_beam.transforms import userstate
+from apache_beam.transforms.timeutil import TimeDomain
+from apache_beam.utils import timestamp
+
+__all__ = [
+'Deduplicate',
+'DeduplicatePerKey',
+]
+
+K = typing.TypeVar('K')
+V = typing.TypeVar('V')
+
+
+@typehints.with_input_types(typing.Tuple[K, V])
+@typehints.with_output_types(typing.Tuple[K, V])
+class DeduplicatePerKey(ptransform.PTransform):
+  """ A PTransform which deduplicates  pair over a time domain and
+  threshold. Values in different windows will NOT be considered duplicates of
+  each other. Deduplication is best effort.
+
+  The durations specified may impose memory and/or storage requirements within
+  a runner and care might need to be used to ensure that the deduplication time
+  limit is long enough to remove duplicates but short enough to not cause
+  performance problems within a runner. Each runner may provide an optimized
+  implementation of their choice using the deduplication time domain and
+  threshold specified.
+
+  Does not preserve any order the input PCollection might have had.
+  """
+  def __init__(self, processing_time_duration=None, event_time_duration=None):
+if processing_time_duration is None and event_time_duration is None:
+  raise ValueError(
+  'DeduplicatePerKey requires at lease provide either'
+  'processing_time_duration or event_time_duration.')
+self.processing_time_duration = processing_time_duration
+self.event_time_duration = event_time_duration
+
+  def _create_deduplicate_fn(self):
+processing_timer_spec = userstate.TimerSpec(
+'processing_timer', TimeDomain.REAL_TIME)
+event_timer_spec = userstate.TimerSpec('event_timer', TimeDomain.WATERMARK)
+state_spec = userstate.BagStateSpec('seen', BooleanCoder())
 
 Review comment:
   Perhaps this should be a combining state? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] robertwb commented on a change in pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state

2020-03-23 Thread GitBox
robertwb commented on a change in pull request #11060: [BEAM-9454] Create 
Deduplication transform based on user timer/state
URL: https://github.com/apache/beam/pull/11060#discussion_r396677254
 
 

 ##
 File path: sdks/python/apache_beam/transforms/deduplicate_test.py
 ##
 @@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+"""Unit tests for deduplicate transform by using TestStream."""
+
+from __future__ import absolute_import
+
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.coders import coders
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.testing.util import equal_to_per_window
+from apache_beam.transforms import deduplicate
+from apache_beam.transforms import window
+from apache_beam.utils.timestamp import Duration
+from apache_beam.utils.timestamp import Timestamp
+
+
+# TestStream is only supported in streaming pipeline. The Deduplicate transform
+# also requires Timer support. Sickbaying this testsuite until dataflow runner
+# supports both TestStream and user timer.
+@attr('ValidatesRunner', 'sickbay-batch', 'sickbay-streaming')
 
 Review comment:
   Is it not possible to sickbay it for only dataflow?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [beam] robertwb commented on a change in pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state

2020-03-23 Thread GitBox
robertwb commented on a change in pull request #11060: [BEAM-9454] Create 
Deduplication transform based on user timer/state
URL: https://github.com/apache/beam/pull/11060#discussion_r396680622
 
 

 ##
 File path: sdks/python/apache_beam/transforms/deduplicate.py
 ##
 @@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+"""a collection of ptransforms for deduplicating elements."""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import typing
+
+from apache_beam import typehints
+from apache_beam.coders.coders import BooleanCoder
+from apache_beam.transforms import core
+from apache_beam.transforms import ptransform
+from apache_beam.transforms import userstate
+from apache_beam.transforms.timeutil import TimeDomain
+from apache_beam.utils import timestamp
+
+__all__ = [
+'Deduplicate',
+'DeduplicatePerKey',
+]
+
+K = typing.TypeVar('K')
+V = typing.TypeVar('V')
+
+
+@typehints.with_input_types(typing.Tuple[K, V])
+@typehints.with_output_types(typing.Tuple[K, V])
+class DeduplicatePerKey(ptransform.PTransform):
+  """ A PTransform which deduplicates  pair over a time domain and
+  threshold. Values in different windows will NOT be considered duplicates of
+  each other. Deduplication is best effort.
+
+  The durations specified may impose memory and/or storage requirements within
 
 Review comment:
   I might phrase this as "Time durations are required so as to avoid unbounded 
memory and/or storage requirements on the runner..."


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >