[GitHub] [beam] stale[bot] commented on issue #10038: Simplify Python test process
stale[bot] commented on issue #10038: Simplify Python test process URL: https://github.com/apache/beam/pull/10038#issuecomment-603041838 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] stale[bot] closed pull request #10038: Simplify Python test process
stale[bot] closed pull request #10038: Simplify Python test process URL: https://github.com/apache/beam/pull/10038 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] pabloem commented on issue #11202: asdletmedah
pabloem commented on issue #11202: asdletmedah URL: https://github.com/apache/beam/pull/11202#issuecomment-603028147 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#discussion_r396853348 ## File path: sdks/go/pkg/beam/core/runtime/exec/reshuffle.go ## @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec + +import ( + "bytes" + "context" + "fmt" + "io" + "math/rand" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" +) + +// ReshuffleInput is a Node. +type ReshuffleInput struct { + UID UnitID + SID StreamID + Coder *coder.Coder // Coder for the input PCollection. + Seed int64 + Out Node + + r*rand.Rand + enc ElementEncoder + wEnc WindowEncoder + bbytes.Buffer + // ret is a cached allocations for passing to the next Unit. Units never modify the passed in FullValue. + ret FullValue +} + +// ID returns the unit debug id. +func (n *ReshuffleInput) ID() UnitID { + return n.UID +} + +// Up initializes the value and window encoders, and the random source. +func (n *ReshuffleInput) Up(ctx context.Context) error { + n.enc = MakeElementEncoder(coder.SkipW(n.Coder)) + n.wEnc = MakeWindowEncoder(n.Coder.Window) + n.r = rand.New(rand.NewSource(n.Seed)) + return nil +} + +// StartBundle is a no-op. +func (n *ReshuffleInput) StartBundle(ctx context.Context, id string, data DataContext) error { + return MultiStartBundle(ctx, id, data, n.Out) +} + +func (n *ReshuffleInput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error { + n.b.Reset() + if err := EncodeWindowedValueHeader(n.wEnc, value.Windows, value.Timestamp, &n.b); err != nil { + return err + } + if err := n.enc.Encode(value, &n.b); err != nil { + return errors.WithContextf(err, "encoding element %v with coder %v", value, n.Coder) + } + n.ret = FullValue{Elm: n.r.Int(), Elm2: n.b.Bytes(), Timestamp: value.Timestamp} + if err := n.Out.ProcessElement(ctx, &n.ret); err != nil { + return err + } + return nil Review comment: ```suggestion return n.Out.ProcessElement(ctx, &n.ret) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#discussion_r396887810 ## File path: sdks/go/pkg/beam/gbk.go ## @@ -95,3 +95,52 @@ func TryCoGroupByKey(s Scope, cols ...PCollection) (PCollection, error) { ret.SetCoder(NewCoder(ret.Type())) return ret, nil } + +// Reshuffle copies a PCollection of the same kind and using the same element +// coder, and maintains the same windowing information. Importantly, it allows +// the result PCollection to be processed with a different sharding, in a +// different stage than the input PCollection. +// +// For example, if a computation needs a lot of parallelism but +// produces only a small amount of output data, then the computation +// producing the data can run with as much parallelism as needed, +// while the output file is written with a smaller amount of +// parallelism, using the following pattern: +// +// pc := bigHairyComputationNeedingParallelism(scope) // PCollection +// resharded := beam.Reshard(scope, pc)// PCollection Review comment: Here and elsewhere in this comment, Reshuffle is referred to as "Reshard". I think it's fine to refer to it as a reshard informally, since that's what it functionally is, but the places where it's used as a proper noun should be switched to Reshuffle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#discussion_r396823199 ## File path: sdks/go/pkg/beam/core/runtime/exec/coder.go ## @@ -259,59 +298,82 @@ type customDecoder struct { dec Decoder } -func (c *customDecoder) Decode(r io.Reader) (*FullValue, error) { +func (c *customDecoder) DecodeTo(r io.Reader, fv *FullValue) error { // (1) Read length-prefixed encoded data size, err := coder.DecodeVarInt(r) if err != nil { - return nil, err + return err } data, err := ioutilx.ReadN(r, (int)(size)) if err != nil { - return nil, err + return err } // (2) Call decode val, err := c.dec.Decode(c.t, data) if err != nil { + return err + } + *fv = FullValue{Elm: val} + return err Review comment: I know this is just preserving the existing behavior, but it seems weird to `return err` here instead of `return nil`, even if it is guaranteed to be `nil` at this point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#discussion_r396886566 ## File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go ## @@ -473,6 +479,164 @@ func (m *marshaller) addNode(n *graph.Node) string { return m.makeNode(id, m.coders.Add(n.Coder), n) } +// expandReshuffle translates resharding to a composite reshuffle +// transform. +// +// With proper runner support, the SDK doesn't need to do anything. +// However, we still need to provide a backup plan in terms of other +// PTransforms in the event the runner doesn't have a native implementation. +// +// In particular, the "backup plan" needs to: +// +// * Encode the windowed element, preserving timestamps. +// * Add random keys to the encoded windowed element []bytes +// * GroupByKey (in the global window). +// * Explode the resulting elements list. +// * Decode the windowed element []bytes. +// +// While a simple reshard can be written in user terms, (timestamps and windows +// are accessible to user functions) there are some framework internal +// optimizations that can be done if the framework is aware of the reshard, though +// ideally this is handled on the runner side. +// +// User code is able to write reshards, but it's easier to access +// the window coders framework side, which is critical for the reshard +// to function with unbounded inputs. +func (m *marshaller) expandReshuffle(edge NamedEdge) string { + id := edgeID(edge.Edge) + var kvCoderID, gbkCoderID string + { + kv := makeUnionCoder() + kvCoderID = m.coders.Add(kv) + gbkCoderID = m.coders.Add(coder.NewCoGBK(kv.Components)) + } + + var subtransforms []string + + in := edge.Edge.Input[0] + + origInput := m.addNode(in.From) + // We need to preserve the old windowing/triggering here + // for re-instatement after the GBK. + preservedWSId := m.pcollections[origInput].GetWindowingStrategyId() + + // Get the windowing strategy from before: + postReify := fmt.Sprintf("%v_%v_reifyts", nodeID(in.From), id) + m.makeNode(postReify, kvCoderID, in.From) + + // We need to replace postReify's windowing strategy with one appropriate + // for reshuffles. + { + wfn := window.NewGlobalWindows() + m.pcollections[postReify].WindowingStrategyId = + m.internWindowingStrategy(&pb.WindowingStrategy{ + // Not segregated by time... + WindowFn: makeWindowFn(wfn), + // ...output after every element is received... + Trigger: &pb.Trigger{ + // Should this be an Always trigger instead? + Trigger: &pb.Trigger_ElementCount_{ + ElementCount: &pb.Trigger_ElementCount{ + ElementCount: 1, + }, + }, + }, + // ...and after outputing, discard the output elements... + AccumulationMode: pb.AccumulationMode_DISCARDING, + // ...and since every pane should have 1 element, + // try to preserve the timestamp. + OutputTime: pb.OutputTime_EARLIEST_IN_PANE, + // Defaults copied from marshalWindowingStrategy. + // TODO(BEAM-3304): migrate to user side operations once trigger support is in. + EnvironmentId: m.addDefaultEnv(), + MergeStatus: pb.MergeStatus_NON_MERGING, + WindowCoderId: m.coders.AddWindowCoder(makeWindowCoder(wfn)), + ClosingBehavior: pb.ClosingBehavior_EMIT_IF_NONEMPTY, + AllowedLateness: 0, + OnTimeBehavior: pb.OnTimeBehavior_FIRE_ALWAYS, + }) + } + + // Inputs (i) + + inputID := fmt.Sprintf("%v_reifyts", id) + payload := &pb.ParDoPayload{ + DoFn: &pb.FunctionSpec{ + Urn: URNReshuffleInput, + Payload: []byte(protox.MustEncodeBase64(&v1.TransformPayload{ + Urn: URNReshuffleInput, + })), + }, + } + input := &pb.PTransform{ + UniqueName: inputID, + Spec: &pb.FunctionSpec{ + Urn: URNParDo, + Payload: protox.MustEncode(payload), +
[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#discussion_r396887966 ## File path: sdks/go/pkg/beam/gbk.go ## @@ -95,3 +95,52 @@ func TryCoGroupByKey(s Scope, cols ...PCollection) (PCollection, error) { ret.SetCoder(NewCoder(ret.Type())) return ret, nil } + +// Reshuffle copies a PCollection of the same kind and using the same element +// coder, and maintains the same windowing information. Importantly, it allows +// the result PCollection to be processed with a different sharding, in a +// different stage than the input PCollection. +// +// For example, if a computation needs a lot of parallelism but +// produces only a small amount of output data, then the computation +// producing the data can run with as much parallelism as needed, +// while the output file is written with a smaller amount of +// parallelism, using the following pattern: +// +// pc := bigHairyComputationNeedingParallelism(scope) // PCollection +// resharded := beam.Reshard(scope, pc)// PCollection +// +// Another use case is when one has a non-deterministic DoFn followed by one +// that performs externally-visible side effects. Inserting a Reshard +// between these DoFns ensures that retries of the second DoFn will always be +// the same, which is necessary to make side effects idempotent. +// +// A Reshuffle will force a break in the optimized pipeline. Consequently, +// this operation should be used sparingly, only after determining that the +// pipeline without reshard is broken in some way and performing an extra +// operation is worth the cost. +func Reshuffle(s Scope, col PCollection) PCollection { + return Must(TryReshuffle(s, col)) +} + +// TryReshuffle inserts a Reshard into the pipeline, and returns an error if Review comment: Same as previous comment, using Reshard instead of Reshuffle. The error message a few lines below also does that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#discussion_r396855047 ## File path: sdks/go/pkg/beam/core/runtime/exec/reshuffle.go ## @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec + +import ( + "bytes" + "context" + "fmt" + "io" + "math/rand" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" +) + +// ReshuffleInput is a Node. +type ReshuffleInput struct { + UID UnitID + SID StreamID + Coder *coder.Coder // Coder for the input PCollection. + Seed int64 + Out Node + + r*rand.Rand + enc ElementEncoder + wEnc WindowEncoder + bbytes.Buffer + // ret is a cached allocations for passing to the next Unit. Units never modify the passed in FullValue. + ret FullValue +} + +// ID returns the unit debug id. +func (n *ReshuffleInput) ID() UnitID { + return n.UID +} + +// Up initializes the value and window encoders, and the random source. +func (n *ReshuffleInput) Up(ctx context.Context) error { + n.enc = MakeElementEncoder(coder.SkipW(n.Coder)) + n.wEnc = MakeWindowEncoder(n.Coder.Window) + n.r = rand.New(rand.NewSource(n.Seed)) + return nil +} + +// StartBundle is a no-op. +func (n *ReshuffleInput) StartBundle(ctx context.Context, id string, data DataContext) error { + return MultiStartBundle(ctx, id, data, n.Out) +} + +func (n *ReshuffleInput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error { + n.b.Reset() + if err := EncodeWindowedValueHeader(n.wEnc, value.Windows, value.Timestamp, &n.b); err != nil { + return err + } + if err := n.enc.Encode(value, &n.b); err != nil { + return errors.WithContextf(err, "encoding element %v with coder %v", value, n.Coder) + } + n.ret = FullValue{Elm: n.r.Int(), Elm2: n.b.Bytes(), Timestamp: value.Timestamp} + if err := n.Out.ProcessElement(ctx, &n.ret); err != nil { + return err + } + return nil +} + +// FinishBundle propagates finish bundle, and clears cached state. +func (n *ReshuffleInput) FinishBundle(ctx context.Context) error { + n.b = bytes.Buffer{} + n.ret = FullValue{} + return MultiFinishBundle(ctx, n.Out) +} + +// Down is a no-op. +func (n *ReshuffleInput) Down(ctx context.Context) error { + return nil +} + +func (n *ReshuffleInput) String() string { + return fmt.Sprintf("ReshuffleInput[%v] Coder:%v", n.SID, n.Coder) +} + +// ReshuffleOutput is a Node. +type ReshuffleOutput struct { + UID UnitID + SID StreamID + Coder *coder.Coder // Coder for the receiving PCollection. + Out Node + + bbytes.Buffer + dec ElementDecoder + wDec WindowDecoder + ret FullValue +} + +// ID returns the unit debug id. +func (n *ReshuffleOutput) ID() UnitID { + return n.UID +} + +// Up initializes the value and window encoders, and the random source. +func (n *ReshuffleOutput) Up(ctx context.Context) error { + n.dec = MakeElementDecoder(coder.SkipW(n.Coder)) + n.wDec = MakeWindowDecoder(n.Coder.Window) + return nil +} + +// StartBundle is a no-op. +func (n *ReshuffleOutput) StartBundle(ctx context.Context, id string, data DataContext) error { + return MultiStartBundle(ctx, id, data, n.Out) +} + +func (n *ReshuffleOutput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error { + // Marshal the pieces into a temporary buffer since they must be transmitted on FnAPI as a single + // unit. + vs, err := values[0].Open() + if err != nil { + return errors.WithContextf(err, "decoding values for %v with coder %v", value, n.Coder) + } + defer vs.Close() + for { + v, err := vs.Read() + if err != nil { + if err == io.EOF { + return nil + } +
[GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#discussion_r396856077 ## File path: sdks/go/pkg/beam/core/runtime/exec/reshuffle.go ## @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec + +import ( + "bytes" + "context" + "fmt" + "io" + "math/rand" + + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" +) + +// ReshuffleInput is a Node. +type ReshuffleInput struct { + UID UnitID + SID StreamID + Coder *coder.Coder // Coder for the input PCollection. + Seed int64 + Out Node + + r*rand.Rand + enc ElementEncoder + wEnc WindowEncoder + bbytes.Buffer + // ret is a cached allocations for passing to the next Unit. Units never modify the passed in FullValue. + ret FullValue +} + +// ID returns the unit debug id. +func (n *ReshuffleInput) ID() UnitID { + return n.UID +} + +// Up initializes the value and window encoders, and the random source. +func (n *ReshuffleInput) Up(ctx context.Context) error { + n.enc = MakeElementEncoder(coder.SkipW(n.Coder)) + n.wEnc = MakeWindowEncoder(n.Coder.Window) + n.r = rand.New(rand.NewSource(n.Seed)) + return nil +} + +// StartBundle is a no-op. +func (n *ReshuffleInput) StartBundle(ctx context.Context, id string, data DataContext) error { + return MultiStartBundle(ctx, id, data, n.Out) +} + +func (n *ReshuffleInput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error { + n.b.Reset() + if err := EncodeWindowedValueHeader(n.wEnc, value.Windows, value.Timestamp, &n.b); err != nil { + return err + } + if err := n.enc.Encode(value, &n.b); err != nil { + return errors.WithContextf(err, "encoding element %v with coder %v", value, n.Coder) + } + n.ret = FullValue{Elm: n.r.Int(), Elm2: n.b.Bytes(), Timestamp: value.Timestamp} + if err := n.Out.ProcessElement(ctx, &n.ret); err != nil { + return err + } + return nil +} + +// FinishBundle propagates finish bundle, and clears cached state. +func (n *ReshuffleInput) FinishBundle(ctx context.Context) error { + n.b = bytes.Buffer{} + n.ret = FullValue{} + return MultiFinishBundle(ctx, n.Out) +} + +// Down is a no-op. +func (n *ReshuffleInput) Down(ctx context.Context) error { + return nil +} + +func (n *ReshuffleInput) String() string { + return fmt.Sprintf("ReshuffleInput[%v] Coder:%v", n.SID, n.Coder) +} + +// ReshuffleOutput is a Node. +type ReshuffleOutput struct { + UID UnitID + SID StreamID + Coder *coder.Coder // Coder for the receiving PCollection. + Out Node + + bbytes.Buffer + dec ElementDecoder + wDec WindowDecoder + ret FullValue +} + +// ID returns the unit debug id. +func (n *ReshuffleOutput) ID() UnitID { + return n.UID +} + +// Up initializes the value and window encoders, and the random source. +func (n *ReshuffleOutput) Up(ctx context.Context) error { + n.dec = MakeElementDecoder(coder.SkipW(n.Coder)) + n.wDec = MakeWindowDecoder(n.Coder.Window) + return nil +} + +// StartBundle is a no-op. +func (n *ReshuffleOutput) StartBundle(ctx context.Context, id string, data DataContext) error { + return MultiStartBundle(ctx, id, data, n.Out) +} + +func (n *ReshuffleOutput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error { + // Marshal the pieces into a temporary buffer since they must be transmitted on FnAPI as a single + // unit. + vs, err := values[0].Open() Review comment: It's strange to me that `values[]` can have multiple elements, but this method ends up actually reading all the values from the first element of it. Could you explain why that happens? Are there sometimes multiple ReStreams representing different things? This is an automated
[GitHub] [beam] chrisgorgo commented on issue #11204: [BEAM-9579] Fix numpy logic operators
chrisgorgo commented on issue #11204: [BEAM-9579] Fix numpy logic operators URL: https://github.com/apache/beam/pull/11204#issuecomment-603002452 R: @aaltay @charlesccychen This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chrisgorgo opened a new pull request #11204: [BEAM-9579] Fix numpy logic operators
chrisgorgo opened a new pull request #11204: [BEAM-9579] Fix numpy logic operators URL: https://github.com/apache/beam/pull/11204 Replacing deprecated '-' logical operators. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.ap
[GitHub] [beam] robertwb opened a new pull request #11203: [BEAM-9577] Define and implement dependency-aware artifact staging service.
robertwb opened a new pull request #11203: [BEAM-9577] Define and implement dependency-aware artifact staging service. URL: https://github.com/apache/beam/pull/11203 This is not yet used anywhere (and even the legacy service is not yet used in Dataflow) but it is good to define what we want this service to look like. R: @lukecwik CC: @ihji Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.ap
[beam] branch master updated (913c9f8 -> fc6cef9)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 913c9f8 Merge pull request #11198 from [BEAM-7923] Obfuscates display ids add fc6cef9 Merge pull request #11074: Store logical type values in Row instead of base values No new revisions were added by this update. Summary of changes: .../apache/beam/sdk/coders/RowCoderGenerator.java | 219 --- .../beam/sdk/schemas/FromRowUsingCreator.java | 15 +- .../org/apache/beam/sdk/schemas/SchemaCoder.java | 60 +-- .../beam/sdk/schemas/SchemaCoderHelpers.java | 168 + .../sdk/schemas/logicaltypes/EnumerationType.java | 30 +- .../beam/sdk/schemas/logicaltypes/OneOfType.java | 35 +- .../apache/beam/sdk/schemas/transforms/Group.java | 252 +++-- .../sdk/schemas/transforms/SchemaAggregateFn.java | 48 ++- .../apache/beam/sdk/schemas/utils/AvroUtils.java | 3 +- .../beam/sdk/schemas/utils/ByteBuddyUtils.java | 27 +- .../beam/sdk/schemas/utils/ConvertHelpers.java | 3 +- .../apache/beam/sdk/schemas/utils/POJOUtils.java | 2 +- .../main/java/org/apache/beam/sdk/values/Row.java | 61 +++- .../org/apache/beam/sdk/values/RowWithGetters.java | 9 +- .../apache/beam/sdk/values/SchemaVerification.java | 3 +- .../org/apache/beam/sdk/coders/RowCoderTest.java | 70 .../apache/beam/sdk/schemas/AvroSchemaTest.java| 2 +- .../beam/sdk/schemas/JavaFieldSchemaTest.java | 57 ++- .../sdk/schemas/logicaltypes/LogicalTypesTest.java | 12 +- .../beam/sdk/schemas/transforms/GroupTest.java | 406 ++--- .../beam/sdk/schemas/utils/SchemaTestUtils.java| 138 ++- .../apache/beam/sdk/schemas/utils/TestPOJOs.java | 13 +- .../protobuf/ProtoDynamicMessageSchema.java| 19 +- .../sql/impl/rel/BeamAggregationRel.java | 17 +- .../sdk/extensions/sql/impl/rel/BeamCalcRel.java | 78 ++-- .../sql/impl/rel/BeamEnumerableConverter.java | 2 +- .../sdk/extensions/sql/impl/rel/BeamSortRel.java | 4 +- .../extensions/sql/impl/rel/BeamUncollectRel.java | 2 +- .../sdk/extensions/sql/impl/rel/BeamUnnestRel.java | 6 +- .../extensions/sql/impl/schema/BeamTableUtils.java | 2 +- .../sql/impl/transform/BeamJoinTransforms.java | 8 +- .../sql/impl/transform/agg/CovarianceFn.java | 4 +- .../sdk/extensions/sql/BeamComplexTypeTest.java| 3 +- .../sql/impl/schema/BeamSqlRowCoderTest.java | 4 +- .../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 2 +- .../sdk/extensions/sql/zetasql/ZetaSqlUtils.java | 4 +- 36 files changed, 1200 insertions(+), 588 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoderHelpers.java
[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values URL: https://github.com/apache/beam/pull/11074#issuecomment-602935452 After 8 runs, the only Java Precommit failures have been random flakes (e.g. in Flink tests). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] reuvenlax merged pull request #11074: Store logical type values in Row instead of base values
reuvenlax merged pull request #11074: Store logical type values in Row instead of base values URL: https://github.com/apache/beam/pull/11074 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] TheNeuralBit commented on issue #10055: [BEAM-8603] Add Python SqlTransform example script
TheNeuralBit commented on issue #10055: [BEAM-8603] Add Python SqlTransform example script URL: https://github.com/apache/beam/pull/10055#issuecomment-602931763 Run XVR_Flink PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] TheNeuralBit commented on issue #10055: [BEAM-8603] Add Python SqlTransform example script
TheNeuralBit commented on issue #10055: [BEAM-8603] Add Python SqlTransform example script URL: https://github.com/apache/beam/pull/10055#issuecomment-602931816 Run XVR_Spark PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] stale[bot] closed pull request #8801: Make temp_location an attribute of the StandardOptions class
stale[bot] closed pull request #8801: Make temp_location an attribute of the StandardOptions class URL: https://github.com/apache/beam/pull/8801 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] stale[bot] commented on issue #8884: Add a PCollectionCache ABC and several file-based implementations
stale[bot] commented on issue #8884: Add a PCollectionCache ABC and several file-based implementations URL: https://github.com/apache/beam/pull/8884#issuecomment-602929552 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] stale[bot] closed pull request #8884: Add a PCollectionCache ABC and several file-based implementations
stale[bot] closed pull request #8884: Add a PCollectionCache ABC and several file-based implementations URL: https://github.com/apache/beam/pull/8884 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] stale[bot] commented on issue #8801: Make temp_location an attribute of the StandardOptions class
stale[bot] commented on issue #8801: Make temp_location an attribute of the StandardOptions class URL: https://github.com/apache/beam/pull/8801#issuecomment-602929558 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] pabloem opened a new pull request #11202: asdletmedah
pabloem opened a new pull request #11202: asdletmedah URL: https://github.com/apache/beam/pull/11202 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/
[GitHub] [beam] pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController URL: https://github.com/apache/beam/pull/11163#discussion_r396767924 ## File path: sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py ## @@ -202,14 +207,24 @@ def _emit_from_file(self, fh, tail): # The first line at pos = 0 is always the header. Read the line without # the new line. to_decode = line[:-1] -if pos == 0: - header = TestStreamFileHeader() - header.ParseFromString(self._coder.decode(to_decode)) - yield header +proto_cls = TestStreamFileHeader if pos == 0 else TestStreamFileRecord +msg = self._try_parse_as(proto_cls, to_decode) +if msg: + yield msg else: - record = TestStreamFileRecord() - record.ParseFromString(self._coder.decode(to_decode)) - yield record + break + + def _try_parse_as(self, proto_cls, to_decode): +try: + msg = proto_cls() + msg.ParseFromString(self._coder.decode(to_decode)) +except DecodeError: + _LOGGER.error( + 'Could not parse as %s. This can indicate that the cache is ' + 'corruputed. Please restart the kernel. ' + '\nfile: %s \nmessage: %s', proto_cls, self._path, to_decode) + msg = None Review comment: Do we just skip? This may mean that the file is corrupted? Should we stop consuming (i.e. rethrow the exception)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController URL: https://github.com/apache/beam/pull/11163#discussion_r396810340 ## File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py ## @@ -170,8 +170,13 @@ def run_pipeline(self, pipeline, options): user_pipeline)): streaming_cache_manager = ie.current_env().cache_manager() if streaming_cache_manager: + + def exception_handler(e): +_LOGGER.error(str(e)) Review comment: Same as above. Do we just log and not stop processing? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController URL: https://github.com/apache/beam/pull/11163#discussion_r396766986 ## File path: sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py ## @@ -166,13 +169,15 @@ def _wait_until_file_exists(self, timeout_secs=30): # Wait for up to `timeout_secs` for the file to be available. start = time.time() -path = os.path.join(self._cache_dir, *self._labels) -while not os.path.exists(path): +while not os.path.exists(self._path): time.sleep(1) if time.time() - start > timeout_timestamp_secs: +from apache_beam.runners.interactive.pipeline_instrument import CacheKey +pcollection_var = CacheKey.from_str(self._labels[-1]).var Review comment: I hadn't stopeed to think that labels are a file name too, huh? I guess the final file name is the PCollection variable name? If so, users may name their PCollections something that is not supported by the OS? (or maybe not since they have to be Python variable names?) Anyway this is not for this PR. But just to think about. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] lostluck commented on a change in pull request #11188: [BEAM-3301] Adding restriction trackers and validation.
lostluck commented on a change in pull request #11188: [BEAM-3301] Adding restriction trackers and validation. URL: https://github.com/apache/beam/pull/11188#discussion_r396795769 ## File path: sdks/go/pkg/beam/core/graph/fn_test.go ## @@ -676,39 +737,77 @@ func (fn *BadSdfElementTRestSize) RestrictionSize(float32, RestT) float64 { type BadRestT struct{} type BadSdfRestTSplitRestParam struct { - *GoodDoFn + *GoodSdf } func (fn *BadSdfRestTSplitRestParam) SplitRestriction(int, BadRestT) []RestT { return []RestT{} } type BadSdfRestTSplitRestReturn struct { - *GoodDoFn + *GoodSdf } func (fn *BadSdfRestTSplitRestReturn) SplitRestriction(int, RestT) []BadRestT { return []BadRestT{} } type BadSdfRestTRestSize struct { - *GoodDoFn + *GoodSdf } func (fn *BadSdfRestTRestSize) RestrictionSize(int, BadRestT) float64 { return 0 } +type BadSdfRestTCreateTracker struct { + *GoodSdf +} + +func (fn *BadSdfRestTCreateTracker) CreateTracker(BadRestT) *RTrackerT { + return &RTrackerT{} +} + // Examples of other type validation that needs to be done. type BadSdfRestSizeReturn struct { - *GoodDoFn + *GoodSdf } func (fn *BadSdfRestSizeReturn) BadSdfRestSizeReturn(int, RestT) int { return 0 } +type BadRTrackerT struct{} Review comment: Consider commenting that this "RTracker" isn't implementing the RTracker interface. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] lostluck commented on a change in pull request #11188: [BEAM-3301] Adding restriction trackers and validation.
lostluck commented on a change in pull request #11188: [BEAM-3301] Adding restriction trackers and validation. URL: https://github.com/apache/beam/pull/11188#discussion_r396803249 ## File path: sdks/go/pkg/beam/core/graph/fn_test.go ## @@ -562,7 +595,13 @@ func (fn *GoodSdf) RestrictionSize(int, RestT) float64 { return 0 } -// TODO(BEAM-3301): Add ProcessElement impl. when restriction trackers are in. +func (fn *GoodSdf) CreateTracker(RestT) *RTrackerT { + return &RTrackerT{} +} + +func (fn *GoodSdf) ProcessElement(*RTrackerT, int) int { Review comment: What do you think of having ProcessElement actually just have an sdf.RTracker value? Having it as the interface simplifies our wrapping approach for dynamic splitting, and means the framework can do it all the time, for safety etc. CreateTracker would still need the actual implementation type, and check that it implements sdf.RTracker of course. We can always extend things to allow a user to "unwrap" the interface if they need direct access to their RTracker implementation for whatever reason. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] lostluck commented on a change in pull request #11188: [BEAM-3301] Adding restriction trackers and validation.
lostluck commented on a change in pull request #11188: [BEAM-3301] Adding restriction trackers and validation. URL: https://github.com/apache/beam/pull/11188#discussion_r396804143 ## File path: sdks/go/pkg/beam/core/sdf/sdf.go ## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package sdf is experimental, incomplete, and not yet meant for general usage. +package sdf + +// RTracker is an interface used to interact with restrictions while processing elements in +// SplittableDoFns. Each implementation of RTracker is expected to be used for tracking a single +// restriction type, which is the type that should be used to create the RTracker, and output by +// TrySplit. +type RTracker interface { + // TryClaim attempts to claim the block of work in the current restriction located at a given + // position. This method must be used in the ProcessElement method of Splittable DoFns to claim + // work before performing it. If no work is claimed, the ProcessElement is not allowed to perform + // work or emit outputs. If the claim is successful, the DoFn must process the entire block. If + // the claim is unsuccessful the ProcessElement method of the DoFn must return without performing + // any additional work or emitting any outputs. + // + // TryClaim accepts an arbitrary value that can be interpreted as the position of a block, and + // returns a boolean indicating whether the claim succeeded. + // + // If the claim fails due to an error, that error can be retrieved with GetError. + // + // For SDFs to work properly, claims must always be monotonically increasing in reference to the + // restriction's start and end points, and every block of work in a restriction must be claimed. + // + // This pseudocode example illustrates the typical usage of TryClaim: + // + // pos = position of first block after restriction.start + // for TryClaim(pos) == true { + // // Do all work in the claimed block and emit outputs. + // pos = position of next block + // } + // return + TryClaim(pos interface{}) (ok bool) + + // GetError returns the error that made this RTracker stop executing, and it returns null if no Review comment: returns nil* This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp
mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp URL: https://github.com/apache/beam/pull/11200#issuecomment-602890165 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] TheNeuralBit commented on a change in pull request #10529: [BEAM-9044] Protobuf options to Schema options
TheNeuralBit commented on a change in pull request #10529: [BEAM-9044] Protobuf options to Schema options URL: https://github.com/apache/beam/pull/10529#discussion_r396791822 ## File path: sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java ## @@ -205,10 +205,12 @@ static Schema getSchema(Descriptors.Descriptor descriptor) { // Store proto field number in metadata. FieldType fieldType = withMetaData(beamFieldTypeFromProtoField(fieldDescriptor), fieldDescriptor); Review comment: I think Reuven was referring to the metadata that's added in `withMetaData`, since that's the "special" proto metadata This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values URL: https://github.com/apache/beam/pull/11074#issuecomment-602887176 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] rohdesamuel opened a new pull request #11201: test
rohdesamuel opened a new pull request #11201: test URL: https://github.com/apache/beam/pull/11201 Change-Id: I18a1a3c4589e1fdc90d919a223ef67465f495374 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastComplete
[GitHub] [beam] boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding to V2
boyuanzz commented on issue #11199: [BEAM-9562] Update Timer encoding to V2 URL: https://github.com/apache/beam/pull/11199#issuecomment-602876259 I'll update the coder implementation once the proto looks good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] pabloem merged pull request #11198: [BEAM-7923] Obfuscates display ids
pabloem merged pull request #11198: [BEAM-7923] Obfuscates display ids URL: https://github.com/apache/beam/pull/11198 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[beam] branch master updated (8083c8f -> 913c9f8)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 8083c8f [BEAM-9305] Allow value provider query strings in _CustomBigQuerySource (#11040) add 913c9f8 Merge pull request #11198 from [BEAM-7923] Obfuscates display ids No new revisions were added by this update. Summary of changes: .../runners/interactive/display/pcoll_visualization.py | 10 +- sdks/python/apache_beam/runners/interactive/utils.py | 10 ++ 2 files changed, 15 insertions(+), 5 deletions(-)
[GitHub] [beam] mxm removed a comment on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp
mxm removed a comment on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp URL: https://github.com/apache/beam/pull/11200#issuecomment-602869001 Flink Runner Nexmark Tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp
mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp URL: https://github.com/apache/beam/pull/11200#issuecomment-602869187 Run Flink Runner Nexmark Tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp
mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp URL: https://github.com/apache/beam/pull/11200#issuecomment-602869001 Flink Runner Nexmark Tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp
mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp URL: https://github.com/apache/beam/pull/11200#issuecomment-602866857 Run Java Flink PortableValidatesRunner Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp
mxm commented on issue #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp URL: https://github.com/apache/beam/pull/11200#issuecomment-602866787 Run Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] mxm opened a new pull request #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp
mxm opened a new pull request #11200: [BEAM-9573] Correct computing of watermark hold for timer output timestamp URL: https://github.com/apache/beam/pull/11200 This PR contains two related changes which would be hard to review in separate PRs: ### BEAM-9573 Correct computing of watermark hold for timer output timestamp With the introduction of timer output timestamps, a new watermark hold had been added to the Flink Runner. The watermark computation works on the keyed state backend which computes a key-scoped watermark hold and not the desired operator-wide watermark hold. Computation: https://github.com/apache/beam/blob/b564239081e9351c56fb0e7d263495b95dd3f8f3/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1140 Key-scoped state: https://github.com/apache/beam/blob/b564239081e9351c56fb0e7d263495b95dd3f8f3/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1130 The solution is to iterate over all available state backend keys. ### BEAM-9566 Mitigate performance issue with watermark hold computation Benchmarks have shown that the watermark computation over all keys is very expensive. This introduces a cache which stores and updates the lowest timestamps for watermark holds due to timer output timestamps. In most cases only the cache should be hit, only when a large number of timers are added and then removed one-after-another, the cache might have to be reloaded with data from the state backend. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](htt
[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values URL: https://github.com/apache/beam/pull/11074#issuecomment-602864720 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK
lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#issuecomment-602862121 Post commits run and pass which is a good sign! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chamikaramj commented on issue #11039: [BEAM-9383] Staging Dataflow artifacts from environment
chamikaramj commented on issue #11039: [BEAM-9383] Staging Dataflow artifacts from environment URL: https://github.com/apache/beam/pull/11039#issuecomment-602855911 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chamikaramj commented on issue #11185: [BEAM-8019] Some generalizations to support cross-language transforms.
chamikaramj commented on issue #11185: [BEAM-8019] Some generalizations to support cross-language transforms. URL: https://github.com/apache/beam/pull/11185#issuecomment-602855159 All tests pass now. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] aaltay commented on issue #11174: [BEAM-7923] Pop failed transform when error is raised
aaltay commented on issue #11174: [BEAM-7923] Pop failed transform when error is raised URL: https://github.com/apache/beam/pull/11174#issuecomment-602854440 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a streaming wordcount integration test
robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a streaming wordcount integration test URL: https://github.com/apache/beam/pull/11148#discussion_r396752081 ## File path: sdks/python/apache_beam/runners/interactive/interactive_runner_test.py ## @@ -147,6 +150,97 @@ def process(self, element): ] self.assertEqual(actual_reified, expected_reified) + def test_streaming_wordcount(self): +class WordExtractingDoFn(beam.DoFn): + def process(self, element): +text_line = element.strip() +words = text_line.split() +return words + +# Add the TestStream so that it can be cached. +ib.options.capturable_sources.add(TestStream) +ib.options.capture_duration = timedelta(seconds=1) Review comment: Why is this need? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a streaming wordcount integration test
robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a streaming wordcount integration test URL: https://github.com/apache/beam/pull/11148#discussion_r396751796 ## File path: sdks/python/apache_beam/runners/interactive/interactive_runner_test.py ## @@ -147,6 +150,97 @@ def process(self, element): ] self.assertEqual(actual_reified, expected_reified) + def test_streaming_wordcount(self): +class WordExtractingDoFn(beam.DoFn): + def process(self, element): +text_line = element.strip() +words = text_line.split() +return words + +# Add the TestStream so that it can be cached. +ib.options.capturable_sources.add(TestStream) +ib.options.capture_duration = timedelta(seconds=1) + +p = beam.Pipeline( +runner=interactive_runner.InteractiveRunner(), +options=StandardOptions(streaming=True)) + +data = ( +p +| TestStream() +.advance_watermark_to(0) +.advance_processing_time(1) +.add_elements(['to', 'be', 'or', 'not', 'to', 'be']) +.advance_watermark_to(20) +.advance_processing_time(1) +.add_elements(['to', 'be', 'or', 'not', 'to', 'be']) +.advance_watermark_to(40) +.advance_processing_time(1) Review comment: Does this not trigger the capture duration? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a streaming wordcount integration test
robertwb commented on a change in pull request #11148: [BEAM-8335] Adds a streaming wordcount integration test URL: https://github.com/apache/beam/pull/11148#discussion_r396752506 ## File path: sdks/python/apache_beam/runners/interactive/interactive_runner_test.py ## @@ -147,6 +150,97 @@ def process(self, element): ] self.assertEqual(actual_reified, expected_reified) + def test_streaming_wordcount(self): +class WordExtractingDoFn(beam.DoFn): + def process(self, element): +text_line = element.strip() +words = text_line.split() +return words + +# Add the TestStream so that it can be cached. +ib.options.capturable_sources.add(TestStream) +ib.options.capture_duration = timedelta(seconds=1) + +p = beam.Pipeline( +runner=interactive_runner.InteractiveRunner(), +options=StandardOptions(streaming=True)) + +data = ( +p +| TestStream() +.advance_watermark_to(0) +.advance_processing_time(1) +.add_elements(['to', 'be', 'or', 'not', 'to', 'be']) +.advance_watermark_to(20) +.advance_processing_time(1) +.add_elements(['to', 'be', 'or', 'not', 'to', 'be']) +.advance_watermark_to(40) +.advance_processing_time(1) +.add_elements(['to', 'be', 'or', 'not', 'to', 'be']) +| beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable + +counts = ( +data +| 'split' >> beam.ParDo(WordExtractingDoFn()) +| 'pair_with_one' >> beam.Map(lambda x: (x, 1)) +| 'group' >> beam.GroupByKey() +| 'count' >> beam.Map(lambda wordones: (wordones[0], sum(wordones[1] + +# Watch the local scope for Interactive Beam so that referenced PCollections +# will be cached. +ib.watch(locals()) + +# This is normally done in the interactive_utils when a transform is +# applied but needs an IPython environment. So we manually run this here. +ie.current_env().track_user_pipelines() + +# This tests that the data was correctly cached. +pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) +expected_data_df = pd.DataFrame( +[('to', 0, [beam.window.IntervalWindow(0, 10)], pane_info), Review comment: It'd be easier to understand the test if there were less data. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] KevinGG commented on issue #11174: [BEAM-7923] Pop failed transform when error is raised
KevinGG commented on issue #11174: [BEAM-7923] Pop failed transform when error is raised URL: https://github.com/apache/beam/pull/11174#issuecomment-602852406 Fixed a failed test. TL;DR, the test was wrong and passed in the past because it got lucky. The test used to pass and started failing with the try-append-finally-pop change, because: 1. The test tried to append two no name / label transforms into a same pipeline, both of them will generate the same label and raise errors while the tests asserted for errors, thus allowing pipeline construction to continue even if it ran into fatal errors; 2. In the past, the pipeline was in a broken state where the current_transform was not popped when the 1st error was raised. The full label generated was `WriteToBigQuery`; 3. In the past, the second transform appending wrongly added parts into the broken current_transform `WriteToBigQuery`, causing the transform node to be appended after a wrong failed parent node and luckily got a unique transform full label `WriteToBigQuery/WriteToBigQuery`, thus the test didn't fail due to duplicated labels. But the pipeline was constructed as `WriteToBigQuery`->`WriteToBigQuery` which was completely messed up; 4. Still the test didn't fail because it was testing for errors even though it built a broken pipeline with broken usages. Once the try-append-finally-pop change is applied, the test functions as: 1. First transform still fails to be appended and has side-effects including an applied transform label, but it would not leave the pipeline in a broken state where current transform is still the right node; 2. Second transform still fails but now appended to the correct node; 3. As long as the 2 transforms have different labels (or executed in different cells if in an interactive environment), the test still passes, pipeline is not broken and can be used for future development, side-effects are ruled out when data-centric APIs such as `show` and `collect` are invoked. The reason we keep the applied label is that we never know what side effects are when the error is raised. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK
lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#issuecomment-602851450 Run Go Postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] aaltay commented on issue #11174: [BEAM-7923] Pop failed transform when error is raised
aaltay commented on issue #11174: [BEAM-7923] Pop failed transform when error is raised URL: https://github.com/apache/beam/pull/11174#issuecomment-602851060 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] boyuanzz commented on a change in pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state
boyuanzz commented on a change in pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state URL: https://github.com/apache/beam/pull/11060#discussion_r396747769 ## File path: sdks/python/apache_beam/transforms/deduplicate.py ## @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""a collection of ptransforms for deduplicating elements.""" + +from __future__ import absolute_import +from __future__ import division + +import typing + +from apache_beam import typehints +from apache_beam.coders.coders import BooleanCoder +from apache_beam.transforms import core +from apache_beam.transforms import ptransform +from apache_beam.transforms import userstate +from apache_beam.transforms.timeutil import TimeDomain +from apache_beam.utils import timestamp + +__all__ = [ +'Deduplicate', +'DeduplicatePerKey', +] + +K = typing.TypeVar('K') +V = typing.TypeVar('V') + + +@typehints.with_input_types(typing.Tuple[K, V]) +@typehints.with_output_types(typing.Tuple[K, V]) +class DeduplicatePerKey(ptransform.PTransform): + """ A PTransform which deduplicates pair over a time domain and + threshold. Values in different windows will NOT be considered duplicates of + each other. Deduplication is best effort. + + The durations specified may impose memory and/or storage requirements within + a runner and care might need to be used to ensure that the deduplication time + limit is long enough to remove duplicates but short enough to not cause + performance problems within a runner. Each runner may provide an optimized + implementation of their choice using the deduplication time domain and + threshold specified. + + Does not preserve any order the input PCollection might have had. + """ + def __init__(self, processing_time_duration=None, event_time_duration=None): +if processing_time_duration is None and event_time_duration is None: + raise ValueError( + 'DeduplicatePerKey requires at lease provide either' + 'processing_time_duration or event_time_duration.') +self.processing_time_duration = processing_time_duration +self.event_time_duration = event_time_duration + + def _create_deduplicate_fn(self): +processing_timer_spec = userstate.TimerSpec( +'processing_timer', TimeDomain.REAL_TIME) +event_timer_spec = userstate.TimerSpec('event_timer', TimeDomain.WATERMARK) +state_spec = userstate.BagStateSpec('seen', BooleanCoder()) Review comment: The `seen_state` is only set once per key during that duration. I'm not sure whether combining state is more suitable here. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] pabloem commented on issue #11040: [BEAM-9305] Allow value provider query strings in _CustomBigQuerySource
pabloem commented on issue #11040: [BEAM-9305] Allow value provider query strings in _CustomBigQuerySource URL: https://github.com/apache/beam/pull/11040#issuecomment-602847544 Thanks @EDjur for the contribution! Thanks @kamilwu for reviewing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[beam] branch master updated (ec8846b -> 8083c8f)
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from ec8846b optionally import grpc (#11187) add 8083c8f [BEAM-9305] Allow value provider query strings in _CustomBigQuerySource (#11040) No new revisions were added by this update. Summary of changes: CHANGES.md | 2 ++ sdks/python/apache_beam/io/gcp/bigquery.py | 22 ++ .../apache_beam/io/gcp/bigquery_read_it_test.py| 6 -- 3 files changed, 20 insertions(+), 10 deletions(-)
[GitHub] [beam] pabloem merged pull request #11040: [BEAM-9305] Allow value provider query strings in _CustomBigQuerySource
pabloem merged pull request #11040: [BEAM-9305] Allow value provider query strings in _CustomBigQuerySource URL: https://github.com/apache/beam/pull/11040 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[beam] branch master updated (1c35224 -> ec8846b)
This is an automated email from the ASF dual-hosted git repository. hannahjiang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 1c35224 Merge pull request #11192 from lukecwik/splittabledofn add ec8846b optionally import grpc (#11187) No new revisions were added by this update. Summary of changes: .../apache_beam/runners/direct/test_stream_impl.py | 16 +--- 1 file changed, 13 insertions(+), 3 deletions(-)
[GitHub] [beam] Hannah-Jiang merged pull request #11187: optionally import grpc
Hannah-Jiang merged pull request #11187: optionally import grpc URL: https://github.com/apache/beam/pull/11187 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] KevinGG commented on a change in pull request #11174: [BEAM-7923] Pop failed transform when error is raised
KevinGG commented on a change in pull request #11174: [BEAM-7923] Pop failed transform when error is raised URL: https://github.com/apache/beam/pull/11174#discussion_r396742548 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -307,58 +307,61 @@ def _replace_if_needed(self, original_transform_node): elif len(inputs) == 0: input_node = pvalue.PBegin(self.pipeline) - # We have to add the new AppliedTransform to the stack before expand() - # and pop it out later to make sure that parts get added correctly. - self.pipeline.transforms_stack.append(replacement_transform_node) - - # Keeping the same label for the replaced node but recursively - # removing labels of child transforms of original transform since they - # will be replaced during the expand below. This is needed in case - # the replacement contains children that have labels that conflicts - # with labels of the children of the original. - self.pipeline._remove_labels_recursively(original_transform_node) - - new_output = replacement_transform.expand(input_node) - assert isinstance( - new_output, (dict, pvalue.PValue, pvalue.DoOutputsTuple)) - - if isinstance(new_output, pvalue.PValue): -new_output.element_type = None -self.pipeline._infer_result_type( -replacement_transform, inputs, new_output) - - if isinstance(new_output, dict): -for new_tag, new_pcoll in new_output.items(): - replacement_transform_node.add_output(new_pcoll, new_tag) - elif isinstance(new_output, pvalue.DoOutputsTuple): -replacement_transform_node.add_output( -new_output, new_output._main_tag) - else: -replacement_transform_node.add_output(new_output, new_output.tag) - - # Recording updated outputs. This cannot be done in the same visitor - # since if we dynamically update output type here, we'll run into - # errors when visiting child nodes. - # - # NOTE: When replacing multiple outputs, the replacement PCollection - # tags must have a matching tag in the original transform. - if isinstance(new_output, pvalue.PValue): -if not new_output.producer: - new_output.producer = replacement_transform_node -output_map[original_transform_node.outputs[new_output.tag]] = \ -new_output - elif isinstance(new_output, (pvalue.DoOutputsTuple, tuple)): -for pcoll in new_output: - if not pcoll.producer: -pcoll.producer = replacement_transform_node - output_map[original_transform_node.outputs[pcoll.tag]] = pcoll - elif isinstance(new_output, dict): -for tag, pcoll in new_output.items(): - if not pcoll.producer: -pcoll.producer = replacement_transform_node - output_map[original_transform_node.outputs[tag]] = pcoll - - self.pipeline.transforms_stack.pop() + try: +# We have to add the new AppliedTransform to the stack before +# expand() and pop it out later to make sure that parts get added +# correctly. +self.pipeline.transforms_stack.append(replacement_transform_node) Review comment: It wouldn't raise an error if it's still a list on the line, and if it's not a list (becomes a None or some other object) at the moment it causes an error, the finally block would error out too. So it's not necessary to exclude it from the try block. Putting it inside the try makes it a little bit more self-explained. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] boyuanzz commented on a change in pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state
boyuanzz commented on a change in pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state URL: https://github.com/apache/beam/pull/11060#discussion_r396742097 ## File path: sdks/python/apache_beam/transforms/deduplicate_test.py ## @@ -0,0 +1,168 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""Unit tests for deduplicate transform by using TestStream.""" + +from __future__ import absolute_import + +import unittest + +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam.coders import coders +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.test_stream import TestStream +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.testing.util import equal_to_per_window +from apache_beam.transforms import deduplicate +from apache_beam.transforms import window +from apache_beam.utils.timestamp import Duration +from apache_beam.utils.timestamp import Timestamp + + +# TestStream is only supported in streaming pipeline. The Deduplicate transform +# also requires Timer support. Sickbaying this testsuite until dataflow runner +# supports both TestStream and user timer. +@attr('ValidatesRunner', 'sickbay-batch', 'sickbay-streaming') Review comment: The 'sickbay-batch' and 'sickbay-streaming' is only used by dataflow suite now. And unfortunately, I don't we have runners supporting these python test now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] youngoli commented on issue #11188: [BEAM-3301] Adding restriction trackers and validation.
youngoli commented on issue #11188: [BEAM-3301] Adding restriction trackers and validation. URL: https://github.com/apache/beam/pull/11188#issuecomment-602838126 Whoops, forgot reviewers. R: @lostluck This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] pabloem commented on issue #11198: [BEAM-7923] Obfuscates display ids
pabloem commented on issue #11198: [BEAM-7923] Obfuscates display ids URL: https://github.com/apache/beam/pull/11198#issuecomment-602837778 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK
lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#issuecomment-602835398 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] pabloem commented on issue #10291: [BEAM-7516][BEAM-8823] FnApiRunner works with work queues, and a primitive watermark manager
pabloem commented on issue #10291: [BEAM-7516][BEAM-8823] FnApiRunner works with work queues, and a primitive watermark manager URL: https://github.com/apache/beam/pull/10291#issuecomment-602830331 I can't reproduce any of the failures locally via `tox -e py35-cloud,py35-cython` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] boyuanzz opened a new pull request #11199: [BEAM-9562] Update Timer encoding to V2
boyuanzz opened a new pull request #11199: [BEAM-9562] Update Timer encoding to V2 URL: https://github.com/apache/beam/pull/11199 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](
[GitHub] [beam] rohdesamuel commented on issue #11148: [BEAM-8335] Adds a streaming wordcount integration test
rohdesamuel commented on issue #11148: [BEAM-8335] Adds a streaming wordcount integration test URL: https://github.com/apache/beam/pull/11148#issuecomment-602825559 R: @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] reuvenlax commented on issue #11074: Store logical type values in Row instead of base values
reuvenlax commented on issue #11074: Store logical type values in Row instead of base values URL: https://github.com/apache/beam/pull/11074#issuecomment-602824816 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is missing a test
udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is missing a test URL: https://github.com/apache/beam/pull/10914#issuecomment-602823472 run python 3.7 postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is missing a test
udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is missing a test URL: https://github.com/apache/beam/pull/10914#issuecomment-602823166 Trying again This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chamikaramj commented on issue #11039: [BEAM-9383] Staging Dataflow artifacts from environment
chamikaramj commented on issue #11039: [BEAM-9383] Staging Dataflow artifacts from environment URL: https://github.com/apache/beam/pull/11039#issuecomment-602822540 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chamikaramj commented on issue #11185: [BEAM-8019] Some generalizations to support cross-language transforms.
chamikaramj commented on issue #11185: [BEAM-8019] Some generalizations to support cross-language transforms. URL: https://github.com/apache/beam/pull/11185#issuecomment-602821407 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] davidyan74 commented on a change in pull request #11198: [BEAM-7923] Obfuscates display ids
davidyan74 commented on a change in pull request #11198: [BEAM-7923] Obfuscates display ids URL: https://github.com/apache/beam/pull/11198#discussion_r396717186 ## File path: sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py ## @@ -246,11 +247,11 @@ def __init__(self, pcoll, include_window_info=False, display_facets=False): if not self._pcoll_var: self._pcoll_var = 'Value' self._cache_key = self._pin.cache_key(self._pcoll) -self._dive_display_id = 'facets_dive_{}_{}'.format( -self._cache_key, id(self)) -self._overview_display_id = 'facets_overview_{}_{}'.format( -self._cache_key, id(self)) -self._df_display_id = 'df_{}_{}'.format(self._cache_key, id(self)) +self._dive_display_id = 'facets_dive_{}'.format( +obfuscate(self._cache_key, id(self))) Review comment: nit: assign the obfuscate return value to a variable so it can be reused. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] pabloem commented on issue #11198: [BEAM-7923] Obfuscates display ids
pabloem commented on issue #11198: [BEAM-7923] Obfuscates display ids URL: https://github.com/apache/beam/pull/11198#issuecomment-602819811 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK
lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#issuecomment-602817069 I'm definitely not merging this until both the PostCommit runs, and someone more familiar with windowing/trigger semantics looks over the configuration I copied over from python: https://github.com/apache/beam/pull/11197/files#diff-ef420fdb9afbce0674282b4ed4481042R530 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] KevinGG opened a new pull request #11198: [BEAM-7923] Obfuscates display ids
KevinGG opened a new pull request #11198: [BEAM-7923] Obfuscates display ids URL: https://github.com/apache/beam/pull/11198 1. Use md5 to hash and digest any inputs into a hexadecimal string. 2. The obfuscation is applied to all display ids in notebooks. Note the ids will start with alphabets such as `facets_dive_` or `table_df_` to be compatible with `document.querySelector()`. Otherwise, hexadecimal strings are compatible with `jQuery()` already. 3. The performance and hash clashing research can be found here: https://www.peterbe.com/plog/best-hashing-function-in-python. **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job
[GitHub] [beam] KevinGG commented on issue #11198: [BEAM-7923] Obfuscates display ids
KevinGG commented on issue #11198: [BEAM-7923] Obfuscates display ids URL: https://github.com/apache/beam/pull/11198#issuecomment-602817070 yapf formatted. Lint passed locally. R: @pabloem R: @davidyan74 R: @rohdesamuel PTAL, thx! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] lukecwik merged pull request #11192: [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change
lukecwik merged pull request #11192: [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change URL: https://github.com/apache/beam/pull/11192 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[beam] branch master updated: [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change
This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 6912011 [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change new 1c35224 Merge pull request #11192 from lukecwik/splittabledofn 6912011 is described below commit 6912011bd7b9745d5a3fd195165482bcc39ffa32 Author: Luke Cwik AuthorDate: Mon Mar 23 08:24:58 2020 -0700 [BEAM-9430] Fix coder sent to Dataflow service for non-portable pipelines due to WatermarkEstimators migration change --- .../runners/dataflow/DataflowPipelineTranslator.java | 20 ++-- .../dataflow/DataflowPipelineTranslatorTest.java | 3 ++- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 5f7ec26..8be785e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -978,12 +978,16 @@ public class DataflowPipelineTranslator { if (context.isFnApi()) { DoFnSignature signature = DoFnSignatures.signatureForDoFn(transform.getFn()); if (signature.processElement().isSplittable()) { -Coder restrictionCoder = -DoFnInvokers.invokerFor(transform.getFn()) -.invokeGetRestrictionCoder( - context.getInput(transform).getPipeline().getCoderRegistry()); +DoFnInvoker doFnInvoker = DoFnInvokers.invokerFor(transform.getFn()); +Coder restrictionAndWatermarkStateCoder = +KvCoder.of( +doFnInvoker.invokeGetRestrictionCoder( + context.getInput(transform).getPipeline().getCoderRegistry()), +doFnInvoker.invokeGetWatermarkEstimatorStateCoder( + context.getInput(transform).getPipeline().getCoderRegistry())); stepContext.addInput( -PropertyNames.RESTRICTION_ENCODING, translateCoder(restrictionCoder, context)); +PropertyNames.RESTRICTION_ENCODING, +translateCoder(restrictionAndWatermarkStateCoder, context)); } } } @@ -1190,7 +1194,11 @@ public class DataflowPipelineTranslator { stepContext.addInput( PropertyNames.RESTRICTION_CODER, -translateCoder(transform.getRestrictionCoder(), context)); +translateCoder( +KvCoder.of( +transform.getRestrictionCoder(), +transform.getWatermarkEstimatorStateCoder()), +context)); } }); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index d48f060..775c782 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -733,7 +733,8 @@ public class DataflowPipelineTranslatorTest implements Serializable { Structs.getObject( processKeyedStep.getProperties(), PropertyNames.RESTRICTION_CODER)); -assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder); +assertEquals( +KvCoder.of(SerializableCoder.of(OffsetRange.class), VoidCoder.of()), restrictionCoder); } /** Smoke test to fail fast if translation of a splittable ParDo in FnAPI. */
[beam] branch master updated (7310ec2 -> fb59b6a)
This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 7310ec2 Merge pull request #11190 from lukecwik/beam9565 add fb59b6a Merge pull request #10990: [BEAM-9569] disable coder inference for rows No new revisions were added by this update. Summary of changes: .../org/apache/beam/sdk/coders/CoderRegistry.java | 5 ++ .../apache/beam/sdk/schemas/transforms/Group.java | 24 --- .../sql/impl/rel/BeamSetOperatorRelBase.java | 55 +++ .../impl/transform/BeamSetOperatorsTransforms.java | 78 +- .../meta/provider/pubsub/PubsubMessageToRow.java | 43 ++-- .../sql/meta/provider/test/TestTableProvider.java | 16 ++--- .../beam/sdk/nexmark/queries/sql/SqlQuery0.java| 1 + 7 files changed, 106 insertions(+), 116 deletions(-)
[GitHub] [beam] reuvenlax merged pull request #10990: [BEAM-9569] disable coder inference for rows
reuvenlax merged pull request #10990: [BEAM-9569] disable coder inference for rows URL: https://github.com/apache/beam/pull/10990 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] ibzib commented on issue #11189: [BEAM-9446] Retain unknown arguments when using uber jar job server.
ibzib commented on issue #11189: [BEAM-9446] Retain unknown arguments when using uber jar job server. URL: https://github.com/apache/beam/pull/11189#issuecomment-602808467 @mxm thanks for the feedback, I addressed your comments. Unrelated: I wasn't aware that the Flink portable jar postcommit has been failing. I filed and self-assigned BEAM-9575. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] Hannah-Jiang commented on issue #11067: [BEAM-9136]Add licenses for dependencies
Hannah-Jiang commented on issue #11067: [BEAM-9136]Add licenses for dependencies URL: https://github.com/apache/beam/pull/11067#issuecomment-602807464 R: @alanmyrvold , do you have time to take a look for Go and Java? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes URL: https://github.com/apache/beam/pull/11038#discussion_r396700454 ## File path: sdks/python/apache_beam/coders/coders.py ## @@ -387,8 +387,11 @@ def register_structured_urn(urn, cls): """Register a coder that's completely defined by its urn and its component(s), if any, which are passed to construct the instance. """ -cls.to_runner_api_parameter = ( -lambda self, unused_context: (urn, None, self._get_component_coders())) +setattr( Review comment: mypy raises an error when overwriting a method: `Cannot assign to a method`. using `setattr` avoids the error. A similar situation arises when setting or getting dynamic attributes. We need to decide on a policy for these cases: 1) avoid the error using `setattr` and `getattr` 2) silence the error with a `# type: ignore[xxx]` comment The second option provides more context, but could hinge on how you feel about the aesthetics of `type ignore` comments. Either way, I agree a comment should be made on the preceding line to clarify. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK
lostluck commented on issue #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#issuecomment-602805178 R: @youngoli This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python URL: https://github.com/apache/beam/pull/8457#discussion_r396695008 ## File path: sdks/python/apache_beam/io/gcp/bigtableio_read_it_test.py ## @@ -0,0 +1,159 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Review comment: Is this test working now ? (If so I can try triggering it from the PR). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python URL: https://github.com/apache/beam/pull/8457#discussion_r396693125 ## File path: sdks/python/apache_beam/io/gcp/bigtableio.py ## @@ -141,3 +144,123 @@ def expand(self, pvalue): | beam.ParDo(_BigTableWriteFn(beam_options['project_id'], beam_options['instance_id'], beam_options['table_id']))) + + +class _BigtableReadFn(beam.DoFn): + def __init__(self, project_id, instance_id, table_id, filter_=None): +""" A DoFn to parallelize reading from a Bigtable table + +:type project_id: str +:param project_id: The ID of the project used for Bigtable access + +:type instance_id: str +:param instance_id: The ID of the instance that owns the table. + +:type table_id: str +:param table_id: The ID of the table. + +:type filter_: :class:`.RowFilter` +:param filter_: (Optional) The filter to apply to the contents of the +specified row(s). If unset, reads every column in +each row. +""" +super(self.__class__, self).__init__() +self._initialize({'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id, + 'filter_': filter_}) + + def _initialize(self, options): +""" The defaults initializer, to assist with pickling + +:return: None +""" +self._options = options +self._table = None +self._counter = Metrics.counter(self.__class__, 'Rows Read') + + def __getstate__(self): +return self._options + + def __setstate__(self, options): +self._initialize(options) + + def start_bundle(self): +# from google.cloud.bigtable import Client +if self._table is None: + # noinspection PyAttributeOutsideInit + self._table = Client(project=self._options['project_id'])\ +.instance(self._options['instance_id'])\ +.table(self._options['table_id']) + + def process(self, source_bundle): +_start_key = source_bundle.start_position +_end_key = source_bundle.stop_position +for row in self._table.read_rows(_start_key, _end_key): + self._counter.inc() + yield row + + def display_data(self): +return {'projectId': DisplayDataItem(self._options['project_id'], + label='Bigtable Project Id'), +'instanceId': DisplayDataItem(self._options['instance_id'], + label='Bigtable Instance Id'), +'tableId': DisplayDataItem(self._options['table_id'], + label='Bigtable Table Id')} + + +class ReadFromBigtable(beam.PTransform): + def __init__(self, project_id, instance_id, table_id, filter_=None): +""" A PTransform wrapper for parallel reading rows from s Bigtable table. + +:type project_id: str +:param project_id: The ID of the project used for Bigtable access + +:type instance_id: str +:param instance_id: The ID of the instance that owns the table. + +:type table_id: str +:param table_id: The ID of the table. + +:type filter_: :class:`.RowFilter` +:param filter_: (Optional) The filter to apply to the contents of the +specified row(s). If unset, reads every column in +each row. If noe is provided, all rows are read by default. +""" +super(self.__class__, self).__init__() +self._options = {'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id, + 'filter_': filter_} + + def __getstate__(self): +return self._options + + def __setstate__(self, options): +self._options = options + + def expand(self, pbegin): +table = Client(project=self._options['project_id'], admin=True) \ + .instance(instance_id=self._options['instance_id']) \ + .table(table_id=self._options['table_id']) + +keys = list(table.sample_row_keys()) + +SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes") +keys.insert(0, SampleRowKey(b'', 0)) + +def chunks(): + for i in range(1, len(keys)): +key_1 = keys[i - 1].row_key +key_2 = keys[i].row_key +size = keys[i].offset_bytes - keys[i - 1].offset_bytes +yield iobase.SourceBundle(size, None, key_1, key_2) Review comment: You don't need to use SourceTestBundle if you are not using Read transform. Please use a separate data structure (or just a tuple) here to avoid confusion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us
[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python URL: https://github.com/apache/beam/pull/8457#discussion_r396691673 ## File path: sdks/python/apache_beam/io/gcp/bigtableio.py ## @@ -141,3 +144,123 @@ def expand(self, pvalue): | beam.ParDo(_BigTableWriteFn(beam_options['project_id'], beam_options['instance_id'], beam_options['table_id']))) + + +class _BigtableReadFn(beam.DoFn): + def __init__(self, project_id, instance_id, table_id, filter_=None): +""" A DoFn to parallelize reading from a Bigtable table + +:type project_id: str +:param project_id: The ID of the project used for Bigtable access + +:type instance_id: str +:param instance_id: The ID of the instance that owns the table. + +:type table_id: str +:param table_id: The ID of the table. + +:type filter_: :class:`.RowFilter` +:param filter_: (Optional) The filter to apply to the contents of the +specified row(s). If unset, reads every column in +each row. +""" +super(self.__class__, self).__init__() +self._initialize({'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id, + 'filter_': filter_}) + + def _initialize(self, options): +""" The defaults initializer, to assist with pickling + +:return: None +""" +self._options = options +self._table = None +self._counter = Metrics.counter(self.__class__, 'Rows Read') + + def __getstate__(self): +return self._options + + def __setstate__(self, options): +self._initialize(options) + + def start_bundle(self): +# from google.cloud.bigtable import Client +if self._table is None: + # noinspection PyAttributeOutsideInit + self._table = Client(project=self._options['project_id'])\ +.instance(self._options['instance_id'])\ +.table(self._options['table_id']) + + def process(self, source_bundle): +_start_key = source_bundle.start_position +_end_key = source_bundle.stop_position +for row in self._table.read_rows(_start_key, _end_key): + self._counter.inc() + yield row + + def display_data(self): +return {'projectId': DisplayDataItem(self._options['project_id'], + label='Bigtable Project Id'), +'instanceId': DisplayDataItem(self._options['instance_id'], + label='Bigtable Instance Id'), +'tableId': DisplayDataItem(self._options['table_id'], + label='Bigtable Table Id')} + + +class ReadFromBigtable(beam.PTransform): + def __init__(self, project_id, instance_id, table_id, filter_=None): +""" A PTransform wrapper for parallel reading rows from s Bigtable table. + +:type project_id: str +:param project_id: The ID of the project used for Bigtable access + +:type instance_id: str +:param instance_id: The ID of the instance that owns the table. + +:type table_id: str +:param table_id: The ID of the table. + +:type filter_: :class:`.RowFilter` +:param filter_: (Optional) The filter to apply to the contents of the +specified row(s). If unset, reads every column in +each row. If noe is provided, all rows are read by default. +""" +super(self.__class__, self).__init__() +self._options = {'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id, + 'filter_': filter_} + + def __getstate__(self): +return self._options + + def __setstate__(self, options): +self._options = options + + def expand(self, pbegin): +table = Client(project=self._options['project_id'], admin=True) \ + .instance(instance_id=self._options['instance_id']) \ + .table(table_id=self._options['table_id']) + +keys = list(table.sample_row_keys()) + +SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes") +keys.insert(0, SampleRowKey(b'', 0)) + +def chunks(): + for i in range(1, len(keys)): Review comment: Do we expect to always get at least two keys here ? If so add an assertion before this statement so that we don't fall through trivially. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python URL: https://github.com/apache/beam/pull/8457#discussion_r396690605 ## File path: sdks/python/apache_beam/io/gcp/bigtableio.py ## @@ -141,3 +144,123 @@ def expand(self, pvalue): | beam.ParDo(_BigTableWriteFn(beam_options['project_id'], beam_options['instance_id'], beam_options['table_id']))) + + +class _BigtableReadFn(beam.DoFn): + def __init__(self, project_id, instance_id, table_id, filter_=None): +""" A DoFn to parallelize reading from a Bigtable table + +:type project_id: str +:param project_id: The ID of the project used for Bigtable access + +:type instance_id: str +:param instance_id: The ID of the instance that owns the table. + +:type table_id: str +:param table_id: The ID of the table. + +:type filter_: :class:`.RowFilter` +:param filter_: (Optional) The filter to apply to the contents of the +specified row(s). If unset, reads every column in +each row. +""" +super(self.__class__, self).__init__() +self._initialize({'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id, + 'filter_': filter_}) + + def _initialize(self, options): +""" The defaults initializer, to assist with pickling + +:return: None +""" +self._options = options +self._table = None +self._counter = Metrics.counter(self.__class__, 'Rows Read') + + def __getstate__(self): +return self._options + + def __setstate__(self, options): +self._initialize(options) + + def start_bundle(self): +# from google.cloud.bigtable import Client +if self._table is None: + # noinspection PyAttributeOutsideInit + self._table = Client(project=self._options['project_id'])\ +.instance(self._options['instance_id'])\ +.table(self._options['table_id']) + + def process(self, source_bundle): +_start_key = source_bundle.start_position +_end_key = source_bundle.stop_position +for row in self._table.read_rows(_start_key, _end_key): + self._counter.inc() + yield row + + def display_data(self): +return {'projectId': DisplayDataItem(self._options['project_id'], + label='Bigtable Project Id'), +'instanceId': DisplayDataItem(self._options['instance_id'], + label='Bigtable Instance Id'), +'tableId': DisplayDataItem(self._options['table_id'], + label='Bigtable Table Id')} + + +class ReadFromBigtable(beam.PTransform): + def __init__(self, project_id, instance_id, table_id, filter_=None): +""" A PTransform wrapper for parallel reading rows from s Bigtable table. + +:type project_id: str +:param project_id: The ID of the project used for Bigtable access + +:type instance_id: str +:param instance_id: The ID of the instance that owns the table. + +:type table_id: str +:param table_id: The ID of the table. + +:type filter_: :class:`.RowFilter` +:param filter_: (Optional) The filter to apply to the contents of the +specified row(s). If unset, reads every column in +each row. If noe is provided, all rows are read by default. +""" +super(self.__class__, self).__init__() +self._options = {'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id, + 'filter_': filter_} + + def __getstate__(self): +return self._options + + def __setstate__(self, options): +self._options = options + + def expand(self, pbegin): +table = Client(project=self._options['project_id'], admin=True) \ + .instance(instance_id=self._options['instance_id']) \ + .table(table_id=self._options['table_id']) + +keys = list(table.sample_row_keys()) + +SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes") +keys.insert(0, SampleRowKey(b'', 0)) Review comment: Please add a comment to explain why this is needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python URL: https://github.com/apache/beam/pull/8457#discussion_r396694342 ## File path: sdks/python/apache_beam/io/gcp/bigtableio.py ## @@ -141,3 +144,123 @@ def expand(self, pvalue): | beam.ParDo(_BigTableWriteFn(beam_options['project_id'], beam_options['instance_id'], beam_options['table_id']))) + + +class _BigtableReadFn(beam.DoFn): + def __init__(self, project_id, instance_id, table_id, filter_=None): +""" A DoFn to parallelize reading from a Bigtable table + +:type project_id: str +:param project_id: The ID of the project used for Bigtable access + +:type instance_id: str +:param instance_id: The ID of the instance that owns the table. + +:type table_id: str +:param table_id: The ID of the table. + +:type filter_: :class:`.RowFilter` +:param filter_: (Optional) The filter to apply to the contents of the +specified row(s). If unset, reads every column in +each row. +""" +super(self.__class__, self).__init__() +self._initialize({'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id, + 'filter_': filter_}) + + def _initialize(self, options): +""" The defaults initializer, to assist with pickling + +:return: None +""" +self._options = options +self._table = None +self._counter = Metrics.counter(self.__class__, 'Rows Read') + + def __getstate__(self): +return self._options + + def __setstate__(self, options): +self._initialize(options) + + def start_bundle(self): +# from google.cloud.bigtable import Client +if self._table is None: + # noinspection PyAttributeOutsideInit + self._table = Client(project=self._options['project_id'])\ +.instance(self._options['instance_id'])\ +.table(self._options['table_id']) + + def process(self, source_bundle): +_start_key = source_bundle.start_position +_end_key = source_bundle.stop_position +for row in self._table.read_rows(_start_key, _end_key): + self._counter.inc() + yield row + + def display_data(self): +return {'projectId': DisplayDataItem(self._options['project_id'], + label='Bigtable Project Id'), +'instanceId': DisplayDataItem(self._options['instance_id'], + label='Bigtable Instance Id'), +'tableId': DisplayDataItem(self._options['table_id'], + label='Bigtable Table Id')} + + +class ReadFromBigtable(beam.PTransform): + def __init__(self, project_id, instance_id, table_id, filter_=None): +""" A PTransform wrapper for parallel reading rows from s Bigtable table. + +:type project_id: str +:param project_id: The ID of the project used for Bigtable access + +:type instance_id: str +:param instance_id: The ID of the instance that owns the table. + +:type table_id: str +:param table_id: The ID of the table. + +:type filter_: :class:`.RowFilter` +:param filter_: (Optional) The filter to apply to the contents of the +specified row(s). If unset, reads every column in +each row. If noe is provided, all rows are read by default. +""" +super(self.__class__, self).__init__() +self._options = {'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id, + 'filter_': filter_} + + def __getstate__(self): +return self._options + + def __setstate__(self, options): +self._options = options + + def expand(self, pbegin): +table = Client(project=self._options['project_id'], admin=True) \ + .instance(instance_id=self._options['instance_id']) \ + .table(table_id=self._options['table_id']) + +keys = list(table.sample_row_keys()) + +SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes") +keys.insert(0, SampleRowKey(b'', 0)) + +def chunks(): + for i in range(1, len(keys)): +key_1 = keys[i - 1].row_key +key_2 = keys[i].row_key +size = keys[i].offset_bytes - keys[i - 1].offset_bytes +yield iobase.SourceBundle(size, None, key_1, key_2) + +return (pbegin +| 'Bundles' >> beam.Create(iter(chunks())) Review comment: Please add unit tests to conver splitting/reading etc. See following for some example unit tests. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio_test.py This is an automated message from the Apache Git Service. To respond to the message, please log on
[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python URL: https://github.com/apache/beam/pull/8457#discussion_r396695591 ## File path: sdks/python/apache_beam/io/gcp/bigtableio_read_it_test.py ## @@ -0,0 +1,159 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. Review comment: Can you try triggering this for a large dataset to make sure that there's no severe perf regression ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python
chamikaramj commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python URL: https://github.com/apache/beam/pull/8457#discussion_r396659933 ## File path: sdks/python/apache_beam/io/gcp/bigtableio.py ## @@ -141,3 +144,123 @@ def expand(self, pvalue): | beam.ParDo(_BigTableWriteFn(beam_options['project_id'], beam_options['instance_id'], beam_options['table_id']))) + + +class _BigtableReadFn(beam.DoFn): + def __init__(self, project_id, instance_id, table_id, filter_=None): +""" A DoFn to parallelize reading from a Bigtable table + +:type project_id: str +:param project_id: The ID of the project used for Bigtable access + +:type instance_id: str +:param instance_id: The ID of the instance that owns the table. + +:type table_id: str +:param table_id: The ID of the table. + +:type filter_: :class:`.RowFilter` +:param filter_: (Optional) The filter to apply to the contents of the +specified row(s). If unset, reads every column in +each row. +""" +super(self.__class__, self).__init__() +self._initialize({'project_id': project_id, + 'instance_id': instance_id, + 'table_id': table_id, + 'filter_': filter_}) + + def _initialize(self, options): +""" The defaults initializer, to assist with pickling + +:return: None +""" +self._options = options +self._table = None +self._counter = Metrics.counter(self.__class__, 'Rows Read') + + def __getstate__(self): +return self._options + + def __setstate__(self, options): +self._initialize(options) + + def start_bundle(self): +# from google.cloud.bigtable import Client +if self._table is None: + # noinspection PyAttributeOutsideInit + self._table = Client(project=self._options['project_id'])\ +.instance(self._options['instance_id'])\ +.table(self._options['table_id']) + + def process(self, source_bundle): +_start_key = source_bundle.start_position +_end_key = source_bundle.stop_position +for row in self._table.read_rows(_start_key, _end_key): + self._counter.inc() + yield row + + def display_data(self): +return {'projectId': DisplayDataItem(self._options['project_id'], + label='Bigtable Project Id'), +'instanceId': DisplayDataItem(self._options['instance_id'], + label='Bigtable Instance Id'), +'tableId': DisplayDataItem(self._options['table_id'], + label='Bigtable Table Id')} + + +class ReadFromBigtable(beam.PTransform): Review comment: Please add docs and annotations to denote that this module and PTransforms (both source and sink) are experimental similar to following. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/mongodbio.py#L52 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes URL: https://github.com/apache/beam/pull/11038#discussion_r396693751 ## File path: sdks/python/mypy.ini ## @@ -17,16 +17,21 @@ [mypy] python_version = 3.6 +files = apache_beam ignore_missing_imports = true -follow_imports = true -warn_no_return = true no_implicit_optional = true +# warnings: +warn_no_return = true warn_redundant_casts = true warn_unused_ignores = true +# formatting: show_error_codes = true -files = apache_beam color_output = true -# uncomment this to see how close we are to being complete +# required setting for dmypy: +follow_imports = error Review comment: mypy picks up the site-packages directory of the python that it's installed into. Only PEP 561 compliant packages will inform the type analysis (i.e. the package must have a `py.typed` file). Alternately, you can specify the interpreter for finding site-packages with `--python-executable`: ``` --python-executable EXECUTABLE Python executable used for finding PEP 561 compliant installed packages and stubs ``` This config change did not generate any additional errors for me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes URL: https://github.com/apache/beam/pull/11038#discussion_r396690657 ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -1300,12 +1300,13 @@ def to_runner_api_parameter(self, context): common_urns.requirements.REQUIRES_STATEFUL_PROCESSING.urn) from apache_beam.runners.common import DoFnSignature sig = DoFnSignature(self.fn) -is_splittable = sig.is_splittable_dofn() Review comment: > Not sure if checking get_restriction_coder() return type instead of is_splittable_dofn() is future proof. `get_restriction_coder()` calls `is_splittable_dofn()` and returns `None` if it's not splittable. So I interpreted a `None` result from this method to mean "is not splittable". ```python def get_restriction_coder(self): # type: () -> Optional[TupleCoder] """Get coder for a restriction when processing an SDF. """ if self.is_splittable_dofn(): return TupleCoder([ (self.get_restriction_provider().restriction_coder()), (self.get_watermark_estimator_provider().estimator_state_coder()) ]) else: return None ``` > I don't understand the change, from a mypy correctness perspective. Here's the problem: ```python if is_splittable: restriction_coder = sig.get_restriction_coder() # returns Optional[TupleCoder] restriction_coder_id = context.coders.get_id(restriction_coder) # does not accept Optional! else:else: restriction_coder_id = None ``` With my changes, we naturally drop the optionality before passing the value to `context.coders.get_id()`. We also avoid a redundant call to `is_splittable_dofn()`, FWIW. I see two options: 1) keep my changes and update the documentation of `get_restriction_coder()` to clarify that `None` result indicates "is not splittable" 2) revert my changes and add `assert restriction_coder is None` before the call to `context.coders.get_id()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes URL: https://github.com/apache/beam/pull/11038#discussion_r396690657 ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -1300,12 +1300,13 @@ def to_runner_api_parameter(self, context): common_urns.requirements.REQUIRES_STATEFUL_PROCESSING.urn) from apache_beam.runners.common import DoFnSignature sig = DoFnSignature(self.fn) -is_splittable = sig.is_splittable_dofn() Review comment: > Not sure if checking get_restriction_coder() return type instead of is_splittable_dofn() is future proof. `get_restriction_coder()` calls `is_splittable_dofn()` and returns `None` if it's not splittable. So I interpreted a `None` result from this method to mean "is not splittable". ```python def get_restriction_coder(self): # type: () -> Optional[TupleCoder] """Get coder for a restriction when processing an SDF. """ if self.is_splittable_dofn(): return TupleCoder([ (self.get_restriction_provider().restriction_coder()), (self.get_watermark_estimator_provider().estimator_state_coder()) ]) else: return None ``` > I don't understand the change, from a mypy correctness perspective. Here's the problem: ```python if is_splittable: restriction_coder = sig.get_restriction_coder() # returns Optional[TupleCoder] restriction_coder_id = context.coders.get_id(restriction_coder) # does not accept Optional! else: restriction_coder_id = None ``` With my changes, we naturally drop the optionality before passing the value to `context.coders.get_id()`. We also avoid a redundant call to `is_splittable_dofn()`, FWIW. I see two options: 1) keep my changes and update the documentation of `get_restriction_coder()` to clarify that `None` result indicates "is not splittable" 2) revert my changes and add `assert restriction_coder is None` before the call to `context.coders.get_id()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] ibzib commented on a change in pull request #11189: [BEAM-9446] Retain unknown arguments when using uber jar job server.
ibzib commented on a change in pull request #11189: [BEAM-9446] Retain unknown arguments when using uber jar job server. URL: https://github.com/apache/beam/pull/11189#discussion_r396684486 ## File path: sdks/python/apache_beam/options/pipeline_options.py ## @@ -285,10 +289,25 @@ def get_all_options( cls._add_argparse_args(parser) # pylint: disable=protected-access if add_extra_args_fn: add_extra_args_fn(parser) + known_args, unknown_args = parser.parse_known_args(self._flags) -if unknown_args: - _LOGGER.warning("Discarding unparseable args: %s", unknown_args) -result = vars(known_args) +if retain_unknown_options: + i = 0 + while i < len(unknown_args): +# Treat all unary flags as booleans, and all binary argument values as +# strings. +if i + 1 >= len(unknown_args) or unknown_args[i + 1].startswith('--'): Review comment: I guess I should change it to `-`, not `--`. `-` is the default prefix for argparse, and AFAICT that is true for Beam as well, even though most (all?) pipeline options use `--`. https://docs.python.org/3/library/argparse.html#prefix-chars So then the question becomes "Should we always check a parameter name starts with `-`?" And the answer is we don't have to, because the call to parse_args will do that for us. `test_retain_unknown_options_unary_missing_prefix` tests that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes URL: https://github.com/apache/beam/pull/11038#discussion_r396684644 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -353,6 +359,8 @@ def release(self, instruction_id): self.cached_bundle_processors[descriptor_id].append(processor) def shutdown(self): +# type: (...) -> None Review comment: correct. looks like I got a little sloppy. will fix in my next push. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes
chadrik commented on a change in pull request #11038: [BEAM-7746] More typing fixes URL: https://github.com/apache/beam/pull/11038#discussion_r396683616 ## File path: sdks/python/apache_beam/runners/worker/operations.py ## @@ -569,7 +580,7 @@ def __init__(self, self.tagged_receivers = None # type: Optional[_TaggedReceivers] # A mapping of timer tags to the input "PCollections" they come in on. self.timer_inputs = timer_inputs or {} -self.input_info = None # type: Optional[Tuple[str, str, coders.WindowedValueCoder, MutableMapping[str, str]]] Review comment: correct, I discovered it was only iterating over keys (using `for x in y`), so it only needed to be `Iterable[str]`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is missing a test
udim commented on issue #10914: [BEAM-8078] streaming_wordcount_debugging.py is missing a test URL: https://github.com/apache/beam/pull/10914#issuecomment-602789494 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [beam] robertwb commented on a change in pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state
robertwb commented on a change in pull request #11060: [BEAM-9454] Create Deduplication transform based on user timer/state URL: https://github.com/apache/beam/pull/11060#discussion_r396675954 ## File path: sdks/python/apache_beam/transforms/deduplicate.py ## @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +"""a collection of ptransforms for deduplicating elements.""" + +from __future__ import absolute_import +from __future__ import division + +import typing + +from apache_beam import typehints +from apache_beam.coders.coders import BooleanCoder +from apache_beam.transforms import core +from apache_beam.transforms import ptransform +from apache_beam.transforms import userstate +from apache_beam.transforms.timeutil import TimeDomain +from apache_beam.utils import timestamp + +__all__ = [ +'Deduplicate', +'DeduplicatePerKey', +] + +K = typing.TypeVar('K') +V = typing.TypeVar('V') + + +@typehints.with_input_types(typing.Tuple[K, V]) +@typehints.with_output_types(typing.Tuple[K, V]) +class DeduplicatePerKey(ptransform.PTransform): + """ A PTransform which deduplicates pair over a time domain and + threshold. Values in different windows will NOT be considered duplicates of + each other. Deduplication is best effort. + + The durations specified may impose memory and/or storage requirements within + a runner and care might need to be used to ensure that the deduplication time + limit is long enough to remove duplicates but short enough to not cause + performance problems within a runner. Each runner may provide an optimized + implementation of their choice using the deduplication time domain and + threshold specified. + + Does not preserve any order the input PCollection might have had. + """ + def __init__(self, processing_time_duration=None, event_time_duration=None): +if processing_time_duration is None and event_time_duration is None: + raise ValueError( + 'DeduplicatePerKey requires at lease provide either' + 'processing_time_duration or event_time_duration.') +self.processing_time_duration = processing_time_duration +self.event_time_duration = event_time_duration + + def _create_deduplicate_fn(self): +processing_timer_spec = userstate.TimerSpec( +'processing_timer', TimeDomain.REAL_TIME) +event_timer_spec = userstate.TimerSpec('event_timer', TimeDomain.WATERMARK) +state_spec = userstate.BagStateSpec('seen', BooleanCoder()) Review comment: Perhaps this should be a combining state? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services