[GitHub] [beam] iemejia commented on pull request #10888: [BEAM-7304] Twister2 Beam runner
iemejia commented on pull request #10888: URL: https://github.com/apache/beam/pull/10888#issuecomment-630608404 Run Seed Job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] iemejia commented on pull request #10888: [BEAM-7304] Twister2 Beam runner
iemejia commented on pull request #10888: URL: https://github.com/apache/beam/pull/10888#issuecomment-630608215 Run Seed Job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] iemejia commented on pull request #10888: [BEAM-7304] Twister2 Beam runner
iemejia commented on pull request #10888: URL: https://github.com/apache/beam/pull/10888#issuecomment-630607979 Run Seed Job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] iemejia commented on pull request #10888: [BEAM-7304] Twister2 Beam runner
iemejia commented on pull request #10888: URL: https://github.com/apache/beam/pull/10888#issuecomment-630607638 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] iemejia commented on pull request #10888: [BEAM-7304] Twister2 Beam runner
iemejia commented on pull request #10888: URL: https://github.com/apache/beam/pull/10888#issuecomment-630607794 Run Seed Job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pulasthi commented on pull request #10888: [BEAM-7304] Twister2 Beam runner
pulasthi commented on pull request #10888: URL: https://github.com/apache/beam/pull/10888#issuecomment-630602130 @iemejia Would you be able to trigger the checks again This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
chamikaramj commented on pull request #11039: URL: https://github.com/apache/beam/pull/11039#issuecomment-630584744 Heejong, can you please resolve conflicts and push an update to rerun tests ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] stale[bot] commented on pull request #10509: RabbitMq IO Connector Refactor + Bugfixes
stale[bot] commented on pull request #10509: URL: https://github.com/apache/beam/pull/10509#issuecomment-630578159 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on pull request #11737: [BEAM-9984] Support BIT_OR aggregation function in Beam SQL
amaliujia commented on pull request #11737: URL: https://github.com/apache/beam/pull/11737#issuecomment-630577035 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia merged pull request #10946: [BEAM-9363] TUMBLE as TVF
amaliujia merged pull request #10946: URL: https://github.com/apache/beam/pull/10946 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image
chamikaramj commented on pull request #11740: URL: https://github.com/apache/beam/pull/11740#issuecomment-630562942 Tests passed. PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] henryken commented on pull request #11736: Katas - Convert task description from HTML to Markdown
henryken commented on pull request #11736: URL: https://github.com/apache/beam/pull/11736#issuecomment-630553569 @pabloem, this pull request can now be merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] henryken commented on pull request #11736: Katas - Convert task description from HTML to Markdown
henryken commented on pull request #11736: URL: https://github.com/apache/beam/pull/11736#issuecomment-630553094 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11748: [BEAM-9339] Ensure that Dataflow's pipeline proto also contains the capabilities
lukecwik commented on a change in pull request #11748: URL: https://github.com/apache/beam/pull/11748#discussion_r427005013 ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ## @@ -183,7 +183,10 @@ public JobSpecification translate( String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); RunnerApi.Environment defaultEnvironmentForDataflow = -Environments.createDockerEnvironment(workerHarnessContainerImageURL); +Environments.createDockerEnvironment(workerHarnessContainerImageURL) +.toBuilder() +.addAllCapabilities(Environments.getJavaCapabilities()) Review comment: Requirements is on the pipeline and that works. Since Dataflow is creating its own environment it is difficult to have common code because of the differences in the artifact staging and job APIs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik merged pull request #11748: [BEAM-9339] Ensure that Dataflow's pipeline proto also contains the capabilities
lukecwik merged pull request #11748: URL: https://github.com/apache/beam/pull/11748 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] youngoli commented on a change in pull request #11747: [BEAM-9951] Using the builder pattern for Go synthetic config frontend
youngoli commented on a change in pull request #11747: URL: https://github.com/apache/beam/pull/11747#discussion_r427004634 ## File path: sdks/go/pkg/beam/io/synthetic/step.go ## @@ -144,49 +143,130 @@ func (fn *sdfStepFn) Setup() { // ProcessElement takes an input and either filters it or produces a number of // outputs identical to that input based on the restriction size. func (fn *sdfStepFn) ProcessElement(rt *offsetrange.Tracker, key, val []byte, emit func([]byte, []byte)) { - if fn.cfg.FilterRatio > 0 && fn.rng.Float64() < fn.cfg.FilterRatio { - return - } + filtered := fn.cfg.filterRatio > 0 && fn.rng.Float64() < fn.cfg.filterRatio + for i := rt.Rest.Start; rt.TryClaim(i) == true; i++ { - emit(key, val) + if !filtered { + emit(key, val) + } + } +} + +// StepConfigBuilder is used to initialize StepConfigs. See StepConfigBuilder's +// methods for descriptions of the fields in a StepConfig and how they can be +// set. The intended approach for using this builder is to begin by calling the +// DefaultStepConfig function, followed by calling setters, followed by calling +// Build. +// +// Usage example: +// +//cfg := synthetic.DefaultStepConfig().OutputPerInput(10).FilterRatio(0.5).Build() +type StepConfigBuilder struct { + cfg StepConfig +} + +// DefaultSourceConfig creates a StepConfig with intended defaults for the +// StepConfig fields. This function is the intended starting point for +// initializing a StepConfig and should always be used to create +// StepConfigBuilders. +// +// To see descriptions of the various StepConfig fields and their defaults, see +// the methods to StepConfigBuilder. +func DefaultStepConfig() *StepConfigBuilder { + return &StepConfigBuilder{ + cfg: StepConfig{ + outputPerInput: 1, // Defaults shouldn't drop elements, so at least 1. + filterRatio:0.0, // Defaults shouldn't drop elements, so don't filter. + splittable: false, // Default to non-splittable, SDFs are situational. + initialSplits: 1, // Defaults to 1, i.e. no initial splitting. + }, } } -// DefaultSourceConfig creates a SourceConfig with intended defaults for its -// fields. SourceConfigs should be initialized with this method. -func DefaultStepConfig() StepConfig { - return StepConfig{ - OutputPerInput: 1, // Defaults shouldn't drop elements, so at least 1. - FilterRatio:0.0, // Defaults shouldn't drop elements, so don't filter. - Splittable: false, // Default to non-splittable, SDFs are situational. - InitialSplits: 1, // Defaults to 1, i.e. no initial splitting. +// OutputPerInput is the number of outputs to emit per input received. Each +// output is identical to the original input. A value of 0 drops all inputs and +// produces no output. +// +// Valid values are in the range of [0, ...] and the default value is 1. Values +// below 0 are invalid as they have no logical meaning for this field. +func (b *StepConfigBuilder) OutputPerInput(val int) *StepConfigBuilder { + b.cfg.outputPerInput = val + return b +} + +// FilterRatio indicates the random chance that an input will be filtered +// out, meaning that no outputs will get emitted for it. For example, a +// FilterRatio of 0.25 means that 25% of inputs will be filtered out, a +// FilterRatio of 0 means no elements are filtered, and a FilterRatio of 1.0 +// means every element is filtered. +// +// In a non-splittable step, this is performed on each input element, meaning +// all outputs for that element would be filtered. In a splittable step, this is +// performed on each input restriction instead of the entire element, meaning +// that some outputs for an element may be filtered and others kept. +// +// Note that even when elements are filtered out, the work associated with +// processing those elements is still performed, which differs from setting an +// OutputPerInput of 0. Also note that if a +// +// Valid values are in the range if [0.0, 1.0], and the default value is 0. In +// order to avoid precision errors, invalid values do not cause errors. Instead, +// values below 0 are functionally equivalent to 0, and values above 1 are +// functionally equivalent to 1. +func (b *StepConfigBuilder) FilterRatio(val float64) *StepConfigBuilder { + b.cfg.filterRatio = val + return b +} + +// Splittable indicates whether the step should use the splittable DoFn or +// non-splittable DoFn implementation. +// +// Splittable steps will split along restrictions representing the number of +// OutputPerInput for each element, so it is most useful for steps with a high +// OutputPerInput. Conversely, if OutputPerInput is 1, then there is no way to +// split restrict
[GitHub] [beam] youngoli commented on a change in pull request #11747: [BEAM-9951] Using the builder pattern for Go synthetic config frontend
youngoli commented on a change in pull request #11747: URL: https://github.com/apache/beam/pull/11747#discussion_r427004608 ## File path: sdks/go/pkg/beam/io/synthetic/source.go ## @@ -135,27 +155,79 @@ func (fn *sourceFn) ProcessElement(rt *offsetrange.Tracker, config SourceConfig, return nil } -// DefaultSourceConfig creates a SourceConfig with intended defaults for its -// fields. SourceConfigs should be initialized with this method. -func DefaultSourceConfig() SourceConfig { - return SourceConfig{ - NumElements: 1, // Defaults shouldn't drop elements, so at least 1. - InitialSplits: 1, // Defaults to 1, i.e. no initial splitting. +// SourceConfigBuilder is used to initialize SourceConfigs. See +// SourceConfigBuilder's methods for descriptions of the fields in a +// SourceConfig and how they can be set. The intended approach for using this +// builder is to begin by calling the DefaultSourceConfig function, followed by +// calling setters, followed by calling Build. +// +// Usage example: +// +//cfg := synthetic.DefaultSourceConfig().NumElements(5000).InitialSplits(2).Build() +type SourceConfigBuilder struct { + cfg SourceConfig +} + +// DefaultSourceConfig creates a SourceConfigBuilder set with intended defaults +// for the SourceConfig fields. This function is the intended starting point for +// initializing a SourceConfig and should always be used to create +// SourceConfigBuilders. +// +// To see descriptions of the various SourceConfig fields and their defaults, +// see the methods to SourceConfigBuilder. +func DefaultSourceConfig() *SourceConfigBuilder { + return &SourceConfigBuilder{ + cfg: SourceConfig{ + numElements: 1, // 0 is invalid (drops elements). + initialSplits: 1, // 0 is invalid (drops elements). + }, + } +} + +// NumElements is the number of elements for the source to generate and emit. +// +// Valid values are in the range of [1, ...] and the default value is 1. Values +// of 0 (and below) are invalid as they result in sources that emit no elements. +func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder { + b.cfg.numElements = val + return b +} + +// InitialSplits determines the number of initial splits to perform in the +// source's SplitRestriction method. Restrictions in synthetic sources represent +// the number of elements being emitted, and this split is performed evenly +// across that number of elements. +// +// Each resulting restriction will have at least 1 element in it, and each +// element being emitted will be contained in exactly one restriction. That +// means that if the desired number of splits is greater than the number of +// elements N, then N initial restrictions will be created, each containing 1 +// element. +// +// Valid values are in the range of [1, ...] and the default value is 1. Values +// of 0 (and below) are invalid as they would result in dropping elements that +// are expected to be emitted. +func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder { + b.cfg.initialSplits = val + return b +} + +// Build constructs the SourceConfig initialized by this builder. It also +// performs error checking on the fields, and panics if any have been set to +// invalid values. +func (b *SourceConfigBuilder) Build() SourceConfig { + if b.cfg.initialSplits <= 0 { + panic(fmt.Sprintf("SourceConfig.InitialSplits must be >= 1. Got: %v", b.cfg.initialSplits)) + } + if b.cfg.numElements <= 0 { + panic(fmt.Sprintf("SourceConfig.NumElements must be >= 1. Got: %v", b.cfg.numElements)) } + return b.cfg } // SourceConfig is a struct containing all the configuration options for a -// synthetic source. +// synthetic source. It should be created via a SourceConfigBuilder. type SourceConfig struct { - // NumElements is the number of elements for the source to generate and - // emit. - NumElements int - - // InitialSplits determines the number of initial splits to perform in the - // source's SplitRestriction method. Note that in some edge cases, the - // number of splits performed might differ from this config value. Each - // restriction will always have one element in it, and at least one - // restriction will always be output, so the number of splits will be in - // the range of [1, N] where N is the size of the original restriction. - InitialSplits int + numElements int Review comment: Oh yea, forgot about that. I'll go with having them exported and just recommend a builder. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
[GitHub] [beam] suztomo commented on pull request #11674: [BEAM-9958] Linkage Checker to use exclusion files as baseline
suztomo commented on pull request #11674: URL: https://github.com/apache/beam/pull/11674#issuecomment-630549911 No test for this scripts. With this PR, we can setup a Jenkins task to run the Linkage Checker, say, "Run Java LinkageChecker". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on pull request #10946: [BEAM-9363] TUMBLE as TVF
amaliujia commented on pull request #10946: URL: https://github.com/apache/beam/pull/10946#issuecomment-630548345 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] damondouglas commented on a change in pull request #11734: [BEAM-9679] Add Core Transforms section / GroupByKey lesson to the Go SDK katas
damondouglas commented on a change in pull request #11734: URL: https://github.com/apache/beam/pull/11734#discussion_r426996114 ## File path: learning/katas/go/Core Transforms/GroupByKey/GroupByKey/task.md ## @@ -0,0 +1,50 @@ + + +# GroupByKey + +GroupByKey is a Beam transform for processing collections of key/value pairs. It’s a parallel +reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The +input to GroupByKey is a collection of key/value pairs that represents a multimap, where the +collection contains multiple pairs that have the same key, but different values. Given such a +collection, you use GroupByKey to collect all of the values associated with each unique key. + +**Kata:** Implement a +https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#GroupByKey";> +beam.GroupByKey transform that groups words by its first letter. + + Refer to https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#GroupByKey";> + beam.GroupByKey to solve this problem. + + + Providing your ParDo a func with two return values, such as below, will transform a PCollection + into a PCollection>. + +``` +func someFunc(element string) (uint8, string) { Review comment: Thank you, Henry. May we consider leaving the first return type to be uint8? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] damondouglas commented on a change in pull request #11734: [BEAM-9679] Add Core Transforms section / GroupByKey lesson to the Go SDK katas
damondouglas commented on a change in pull request #11734: URL: https://github.com/apache/beam/pull/11734#discussion_r426996114 ## File path: learning/katas/go/Core Transforms/GroupByKey/GroupByKey/task.md ## @@ -0,0 +1,50 @@ + + +# GroupByKey + +GroupByKey is a Beam transform for processing collections of key/value pairs. It’s a parallel +reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The +input to GroupByKey is a collection of key/value pairs that represents a multimap, where the +collection contains multiple pairs that have the same key, but different values. Given such a +collection, you use GroupByKey to collect all of the values associated with each unique key. + +**Kata:** Implement a +https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#GroupByKey";> +beam.GroupByKey transform that groups words by its first letter. + + Refer to https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam#GroupByKey";> + beam.GroupByKey to solve this problem. + + + Providing your ParDo a func with two return values, such as below, will transform a PCollection + into a PCollection>. + +``` +func someFunc(element string) (uint8, string) { Review comment: Thank you, Henry. May we consider leaving the first return type to be uint8? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz opened a new pull request #11749: [WIP, DO NOT REVIEW PLEASE] Implement ReadFromKafkaViaSDF
boyuanzz opened a new pull request #11749: URL: https://github.com/apache/beam/pull/11749 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
[GitHub] [beam] boyuanzz commented on pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker
boyuanzz commented on pull request #11715: URL: https://github.com/apache/beam/pull/11715#issuecomment-630536199 Latest changes are for addressing comments and using double during computation. @lukecwik PTAL. Thanks for your help! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit merged pull request #11528: [BEAM-9821] Populate all SpannerIO batching parameters in display data.
TheNeuralBit merged pull request #11528: URL: https://github.com/apache/beam/pull/11528 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] apilloud commented on a change in pull request #10946: [BEAM-9363] TUMBLE as TVF
apilloud commented on a change in pull request #10946: URL: https://github.com/apache/beam/pull/10946#discussion_r426974365 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/TVFStreamingUtils.java ## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.utils; + +/** Provides static constants or utils for TVF streaming. */ +public class TVFStreamingUtils { Review comment: In that case, drop the constants all together. You can't reference a class in Beam from Calcite, and these constants are used in that class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #11748: [BEAM-9339] Ensure that Dataflow's pipeline proto also contains the capabilities
robertwb commented on a change in pull request #11748: URL: https://github.com/apache/beam/pull/11748#discussion_r426974286 ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ## @@ -183,7 +183,10 @@ public JobSpecification translate( String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); RunnerApi.Environment defaultEnvironmentForDataflow = -Environments.createDockerEnvironment(workerHarnessContainerImageURL); +Environments.createDockerEnvironment(workerHarnessContainerImageURL) +.toBuilder() +.addAllCapabilities(Environments.getJavaCapabilities()) Review comment: Is there some common utility we should be using here (rather than duplicating the code that's used in all the other portable runners)? What about requirements? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
lukecwik commented on a change in pull request #11039: URL: https://github.com/apache/beam/pull/11039#discussion_r426974548 ## File path: runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ## @@ -53,9 +53,12 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.DockerPayload; Review comment: Shouldn't we have a test to show the artifacts were properly set? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
lukecwik commented on a change in pull request #11039: URL: https://github.com/apache/beam/pull/11039#discussion_r426973945 ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ## @@ -784,7 +877,25 @@ public DataflowPipelineJob run(Pipeline pipeline) { "Executing pipeline on the Dataflow Service, which will have billing implications " + "related to Google Compute Engine usage and other Google Cloud Services."); -List packages = options.getStager().stageDefaultFiles(); +// Capture the sdkComponents for look up during step translations +SdkComponents sdkComponents = SdkComponents.create(); + +DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); +String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions); +RunnerApi.Environment defaultEnvironmentForDataflow = +Environments.createDockerEnvironment(workerHarnessContainerImageURL); + +sdkComponents.registerEnvironment( +defaultEnvironmentForDataflow +.toBuilder() +.addAllDependencies(getDefaultArtifacts()) Review comment: We also need to make sure we have the capabilities set, I have this PR: https://github.com/apache/beam/pull/11748 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
lukecwik commented on a change in pull request #11039: URL: https://github.com/apache/beam/pull/11039#discussion_r426973945 ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ## @@ -784,7 +877,25 @@ public DataflowPipelineJob run(Pipeline pipeline) { "Executing pipeline on the Dataflow Service, which will have billing implications " + "related to Google Compute Engine usage and other Google Cloud Services."); -List packages = options.getStager().stageDefaultFiles(); +// Capture the sdkComponents for look up during step translations +SdkComponents sdkComponents = SdkComponents.create(); + +DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); +String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions); +RunnerApi.Environment defaultEnvironmentForDataflow = +Environments.createDockerEnvironment(workerHarnessContainerImageURL); + +sdkComponents.registerEnvironment( +defaultEnvironmentForDataflow +.toBuilder() +.addAllDependencies(getDefaultArtifacts()) Review comment: We also need to make sure we have the capabilities set, I have this PR: https://github.com/apache/beam/pull/11748 since it was missing before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji commented on a change in pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
ihji commented on a change in pull request #11039: URL: https://github.com/apache/beam/pull/11039#discussion_r426971999 ## File path: runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ## @@ -195,7 +187,7 @@ public void testFileWithExtensionPackageNamingAndSize() throws Exception { PackageAttributes attr = makePackageAttributes(tmpFile, null); DataflowPackage target = attr.getDestination(); -assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt")); +assertThat(target.getName(), RegexMatcher.matches(UUID_PATTERN + ".txt")); Review comment: Hmm. You're right. UUID is implementation detail behind the uniqueness guarantee. I changed the tests to only check whether it keeps the same extension. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #11748: [BEAM-9339] Ensure that Dataflow's pipeline proto also contains the capabilities
lukecwik commented on pull request #11748: URL: https://github.com/apache/beam/pull/11748#issuecomment-630514195 CC: @ananvay This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik edited a comment on pull request #11748: [BEAM-9339] Ensure that Dataflow's pipeline proto also contains the capabilities
lukecwik edited a comment on pull request #11748: URL: https://github.com/apache/beam/pull/11748#issuecomment-630514089 R: @ihji @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #11748: [BEAM-9339] Ensure that Dataflow's pipeline proto also contains the capabilities
lukecwik commented on pull request #11748: URL: https://github.com/apache/beam/pull/11748#issuecomment-630514089 R: @y1chi @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik opened a new pull request #11748: [BEAM-9339] Ensure that Dataflow's pipeline proto also contains the capabilities
lukecwik opened a new pull request #11748: URL: https://github.com/apache/beam/pull/11748 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/
[GitHub] [beam] amaliujia commented on a change in pull request #10946: [BEAM-9363] TUMBLE as TVF
amaliujia commented on a change in pull request #10946: URL: https://github.com/apache/beam/pull/10946#discussion_r426969362 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument; + +import java.lang.reflect.Type; +import java.util.List; +import java.util.Set; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.TableFunctionScan; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelColumnMapping; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode; +import org.joda.time.Duration; + +/** + * BeamRelNode to replace {@code TableFunctionScan}. Currently this class limits to support + * table-valued function for streaming windowing. + */ +public class BeamTableFunctionScanRel extends TableFunctionScan implements BeamRelNode { + public BeamTableFunctionScanRel( + RelOptCluster cluster, + RelTraitSet traitSet, + List inputs, + RexNode rexCall, + Type elementType, + RelDataType rowType, + Set columnMappings) { +super(cluster, traitSet, inputs, rexCall, elementType, rowType, columnMappings); + } + + @Override + public TableFunctionScan copy( + RelTraitSet traitSet, + List list, + RexNode rexNode, + Type type, + RelDataType relDataType, + Set set) { +return new BeamTableFunctionScanRel( +getCluster(), traitSet, list, rexNode, type, relDataType, columnMappings); + } + + @Override + public PTransform, PCollection> buildPTransform() { +return new Transform(); + } + + private class Transform extends PTransform, PCollection> { + +@Override +public PCollection expand(PCollectionList input) { + checkArgument( + input.size() == 1, + "Wrong number of inputs for %s, expected 1 input but received: %s", + BeamTableFunctionScanRel.class.getSimpleName(), + input); + String operatorName = ((RexCall) getCall()).getOperator().getName(); + checkArgument( + operatorName.equals("TUMBLE"), + "Only support TUMBLE table-valued function. Current operator: %s", + operatorName); + RexCall call = ((RexCall) getCall()); + RexInputRef wmCol = (RexInputRef) call.getOperands().get(1); + PCollection upstream = input.get(0); + Schema outputSchema = CalciteUtils.toSchema(getRowType()); + return upstream + .apply( + ParDo.of( + new FixedWindowDo
[GitHub] [beam] amaliujia commented on a change in pull request #10946: [BEAM-9363] TUMBLE as TVF
amaliujia commented on a change in pull request #10946: URL: https://github.com/apache/beam/pull/10946#discussion_r426968398 ## File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java ## @@ -175,6 +185,37 @@ private void addBuiltinFunctionsToCatalog(SimpleCatalog catalog, AnalyzerOptions Mode.SCALAR, ImmutableList.of(resolvedFunc.getSignature( .forEach(catalog::addFunction); + +FunctionArgumentType retType = +new FunctionArgumentType(SignatureArgumentKind.ARG_TYPE_RELATION); + +FunctionArgumentType inputTableType = +new FunctionArgumentType(SignatureArgumentKind.ARG_TYPE_RELATION); + +FunctionArgumentType descriptorType = +new FunctionArgumentType( +SignatureArgumentKind.ARG_TYPE_DESCRIPTOR, +FunctionArgumentTypeOptionsProto.newBuilder() +.setDescriptorResolutionTableOffset(0) +.build(), +1); + +FunctionArgumentType stringType = +new FunctionArgumentType(TypeFactory.createSimpleType(TypeKind.TYPE_STRING)); + +// TUMBLE +catalog.addTableValuedFunction( +new TableValuedFunction.ForwardInputSchemaToOutputSchemaWithAppendedColumnTVF( Review comment: Yeah. And at least I am glad this name itself does not exceed 80 chars so it at least fits into a single line.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #11674: [BEAM-9958] Linkage Checker to use exclusion files as baseline
aaltay commented on pull request #11674: URL: https://github.com/apache/beam/pull/11674#issuecomment-630510006 Running the tests. Does any test, actually test this code? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #11437: [BEAM-9770] Add BigQueryIO deadletter pattern
aaltay commented on pull request #11437: URL: https://github.com/apache/beam/pull/11437#issuecomment-630510056 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #11674: [BEAM-9958] Linkage Checker to use exclusion files as baseline
aaltay commented on pull request #11674: URL: https://github.com/apache/beam/pull/11674#issuecomment-630509865 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #10946: [BEAM-9363] TUMBLE as TVF
amaliujia commented on a change in pull request #10946: URL: https://github.com/apache/beam/pull/10946#discussion_r426967410 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/TVFStreamingUtils.java ## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.utils; + +/** Provides static constants or utils for TVF streaming. */ +public class TVFStreamingUtils { Review comment: `SqlWindowTableFunction` is a class in Calcite (since 1.22.0). After we successfully upgrade to newer version of Calcite (I hope), we can remove `SqlWindowTableFunction`, thus there is a need to keep a `TVFStreamingUtils`. There could be an argument though that such constants can be put into `SqlWindowTableFunction` in Calcite. We can leave such discussion in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #10946: [BEAM-9363] TUMBLE as TVF
amaliujia commented on a change in pull request #10946: URL: https://github.com/apache/beam/pull/10946#discussion_r426966749 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument; + +import java.lang.reflect.Type; +import java.util.List; +import java.util.Set; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.TableFunctionScan; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelColumnMapping; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode; +import org.joda.time.Duration; + +/** + * BeamRelNode to replace {@code TableFunctionScan}. Currently this class limits to support + * table-valued function for streaming windowing. + */ +public class BeamTableFunctionScanRel extends TableFunctionScan implements BeamRelNode { + public BeamTableFunctionScanRel( + RelOptCluster cluster, + RelTraitSet traitSet, + List inputs, + RexNode rexCall, + Type elementType, + RelDataType rowType, + Set columnMappings) { +super(cluster, traitSet, inputs, rexCall, elementType, rowType, columnMappings); + } + + @Override + public TableFunctionScan copy( + RelTraitSet traitSet, + List list, + RexNode rexNode, + Type type, + RelDataType relDataType, + Set set) { +return new BeamTableFunctionScanRel( +getCluster(), traitSet, list, rexNode, type, relDataType, columnMappings); Review comment: Good catch! Replaced with the right parameter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] apilloud commented on a change in pull request #10946: [BEAM-9363] TUMBLE as TVF
apilloud commented on a change in pull request #10946: URL: https://github.com/apache/beam/pull/10946#discussion_r426949818 ## File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TVFScanConverter.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.zetasql.translation; + +import com.google.zetasql.resolvedast.ResolvedNode; +import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedTVFArgument; +import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedTVFScan; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalTableFunctionScan; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelRecordType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName; + +class TVFScanConverter extends RelConverter { + + TVFScanConverter(ConversionContext context) { +super(context); + } + + @Override + public RelNode convert(ResolvedTVFScan zetaNode, List inputs) { +RelNode input = inputs.get(0); +RelNode tableFunctionScan = +LogicalTableFunctionScan.create( +getCluster(), +inputs, +getExpressionConverter() +.convertTableValuedFunction( +input, +zetaNode.getTvf(), +zetaNode.getArgumentList(), + zetaNode.getArgumentList().get(0).getScan().getColumnList()), +null, +createRowTypeWithWindowStartAndEnd(input.getRowType()), +Collections.EMPTY_SET); + +return tableFunctionScan; + } + + @Override + public List getInputs(ResolvedTVFScan zetaNode) { +List inputs = new ArrayList(); +for (ResolvedTVFArgument argument : zetaNode.getArgumentList()) { + if (argument.getScan() != null) { +inputs.add(argument.getScan()); + } +} +return inputs; + } + + private RelDataType createRowTypeWithWindowStartAndEnd(RelDataType inputRowType) { +List newFields = new ArrayList<>(inputRowType.getFieldList()); +RelDataType timestampType = getCluster().getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP); + +RelDataTypeField windowStartField = +new RelDataTypeFieldImpl("window_start", newFields.size(), timestampType); Review comment: Use the constant? ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument; + +import java.lang.reflect.Type; +import java.util.List; +import java.util.Set; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.exte
[GitHub] [beam] TheNeuralBit commented on pull request #11532: [BEAM-9822] Disable grouping when streaming
TheNeuralBit commented on pull request #11532: URL: https://github.com/apache/beam/pull/11532#issuecomment-630495848 Looks like you need to run spotless to auto-format. You can use `./gradlew spotlessApply` to do that locally (may need to do it on the other PRs as well) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'
TheNeuralBit commented on pull request #11570: URL: https://github.com/apache/beam/pull/11570#issuecomment-630495763 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.
TheNeuralBit commented on pull request #11529: URL: https://github.com/apache/beam/pull/11529#issuecomment-630495233 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11528: [BEAM-9821] Populate all SpannerIO batching parameters in display data.
TheNeuralBit commented on pull request #11528: URL: https://github.com/apache/beam/pull/11528#issuecomment-630494999 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11532: [BEAM-9822] Disable grouping when streaming
TheNeuralBit commented on pull request #11532: URL: https://github.com/apache/beam/pull/11532#issuecomment-630494893 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11528: [BEAM-9821] Populate all SpannerIO batching parameters in display data.
TheNeuralBit commented on pull request #11528: URL: https://github.com/apache/beam/pull/11528#issuecomment-630494783 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on pull request #11532: [BEAM-9822] Disable grouping when streaming
nielm commented on pull request #11532: URL: https://github.com/apache/beam/pull/11532#issuecomment-630492055 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on pull request #11528: [BEAM-9821] Populate all SpannerIO batching parameters in display data.
nielm commented on pull request #11528: URL: https://github.com/apache/beam/pull/11528#issuecomment-630492150 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm removed a comment on pull request #11532: [BEAM-9822] Disable grouping when streaming
nielm removed a comment on pull request #11532: URL: https://github.com/apache/beam/pull/11532#issuecomment-630472990 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm removed a comment on pull request #11528: [BEAM-9821] Populate all SpannerIO batching parameters in display data.
nielm removed a comment on pull request #11528: URL: https://github.com/apache/beam/pull/11528#issuecomment-630469413 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm removed a comment on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.
nielm removed a comment on pull request #11529: URL: https://github.com/apache/beam/pull/11529#issuecomment-630476722 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'
nielm commented on pull request #11570: URL: https://github.com/apache/beam/pull/11570#issuecomment-630491802 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.
nielm commented on pull request #11529: URL: https://github.com/apache/beam/pull/11529#issuecomment-630491891 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on a change in pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'
nielm commented on a change in pull request #11570: URL: https://github.com/apache/beam/pull/11570#discussion_r426947630 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ## @@ -1171,67 +1145,127 @@ public void processElement(ProcessContext c) { * occur, Therefore this DoFn has to be tested in isolation. */ @VisibleForTesting - static class GatherBundleAndSortFn extends DoFn>> { -private final long maxBatchSizeBytes; -private final long maxNumMutations; -private final long maxNumRows; - -// total size of the current batch. -private long batchSizeBytes; -// total number of mutated cells. -private long batchCells; -// total number of rows mutated. -private long batchRows; + static class GatherSortCreateBatchesFn extends DoFn> { +private final long maxBatchSizeBytes; +private final long maxBatchNumMutations; +private final long maxBatchNumRows; +private final long maxSortableSizeBytes; +private final long maxSortableNumMutations; +private final long maxSortableNumRows; private final PCollectionView schemaView; +private final ArrayList mutationsToSort = new ArrayList<>(); -private transient ArrayList> mutationsToSort = null; +// total size of MutationGroups in mutationsToSort. +private long sortableSizeBytes; +// total number of mutated cells in mutationsToSort +private long sortableNumCells; +// total number of rows mutated in mutationsToSort +private long sortableNumRows; -GatherBundleAndSortFn( +GatherSortCreateBatchesFn( long maxBatchSizeBytes, long maxNumMutations, long maxNumRows, long groupingFactor, PCollectionView schemaView) { - this.maxBatchSizeBytes = maxBatchSizeBytes * groupingFactor; - this.maxNumMutations = maxNumMutations * groupingFactor; - this.maxNumRows = maxNumRows * groupingFactor; + this.maxBatchSizeBytes = maxBatchSizeBytes; + this.maxBatchNumMutations = maxNumMutations; + this.maxBatchNumRows = maxNumRows; + + if (groupingFactor <= 0) { +groupingFactor = 1; + } + + this.maxSortableSizeBytes = maxBatchSizeBytes * groupingFactor; + this.maxSortableNumMutations = maxNumMutations * groupingFactor; + this.maxSortableNumRows = maxNumRows * groupingFactor; this.schemaView = schemaView; } @StartBundle public synchronized void startBundle() throws Exception { - if (mutationsToSort == null) { -initSorter(); - } else { -throw new IllegalStateException("Sorter should be null here"); - } + initSorter(); } -private void initSorter() { - mutationsToSort = new ArrayList>((int) maxNumMutations); - batchSizeBytes = 0; - batchCells = 0; - batchRows = 0; +private synchronized void initSorter() { Review comment: > Do we need to mark this as synchronized. Looks like all the callers are synchronized themselves. Probably not, but it does not harm. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on pull request #11570: [BEAM-9822] Merge the stages 'Gather and Sort' and 'Create Batches'
nielm commented on pull request #11570: URL: https://github.com/apache/beam/pull/11570#issuecomment-630488670 @allenpradeep > 1. What mode should our import pipeline use? Should it use option b as data in AVRO seems already sorted? We can discuss this outside the scope of this PR. > 2. Where should we document these modes of operation so that some customer can use these? I have added a section to the javadoc explaining these 3 modes of operation, and their pros and cons. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
chamikaramj commented on pull request #11039: URL: https://github.com/apache/beam/pull/11039#issuecomment-630487229 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #11747: [BEAM-9951] Using the builder pattern for Go synthetic config frontend
lostluck commented on a change in pull request #11747: URL: https://github.com/apache/beam/pull/11747#discussion_r426935903 ## File path: sdks/go/pkg/beam/io/synthetic/source.go ## @@ -135,27 +155,79 @@ func (fn *sourceFn) ProcessElement(rt *offsetrange.Tracker, config SourceConfig, return nil } -// DefaultSourceConfig creates a SourceConfig with intended defaults for its -// fields. SourceConfigs should be initialized with this method. -func DefaultSourceConfig() SourceConfig { - return SourceConfig{ - NumElements: 1, // Defaults shouldn't drop elements, so at least 1. - InitialSplits: 1, // Defaults to 1, i.e. no initial splitting. +// SourceConfigBuilder is used to initialize SourceConfigs. See +// SourceConfigBuilder's methods for descriptions of the fields in a +// SourceConfig and how they can be set. The intended approach for using this +// builder is to begin by calling the DefaultSourceConfig function, followed by +// calling setters, followed by calling Build. +// +// Usage example: +// +//cfg := synthetic.DefaultSourceConfig().NumElements(5000).InitialSplits(2).Build() +type SourceConfigBuilder struct { + cfg SourceConfig +} + +// DefaultSourceConfig creates a SourceConfigBuilder set with intended defaults +// for the SourceConfig fields. This function is the intended starting point for +// initializing a SourceConfig and should always be used to create +// SourceConfigBuilders. +// +// To see descriptions of the various SourceConfig fields and their defaults, +// see the methods to SourceConfigBuilder. +func DefaultSourceConfig() *SourceConfigBuilder { + return &SourceConfigBuilder{ + cfg: SourceConfig{ + numElements: 1, // 0 is invalid (drops elements). + initialSplits: 1, // 0 is invalid (drops elements). + }, + } +} + +// NumElements is the number of elements for the source to generate and emit. +// +// Valid values are in the range of [1, ...] and the default value is 1. Values +// of 0 (and below) are invalid as they result in sources that emit no elements. +func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder { + b.cfg.numElements = val + return b +} + +// InitialSplits determines the number of initial splits to perform in the +// source's SplitRestriction method. Restrictions in synthetic sources represent +// the number of elements being emitted, and this split is performed evenly +// across that number of elements. +// +// Each resulting restriction will have at least 1 element in it, and each +// element being emitted will be contained in exactly one restriction. That +// means that if the desired number of splits is greater than the number of +// elements N, then N initial restrictions will be created, each containing 1 +// element. +// +// Valid values are in the range of [1, ...] and the default value is 1. Values +// of 0 (and below) are invalid as they would result in dropping elements that +// are expected to be emitted. +func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder { + b.cfg.initialSplits = val + return b +} + +// Build constructs the SourceConfig initialized by this builder. It also +// performs error checking on the fields, and panics if any have been set to +// invalid values. +func (b *SourceConfigBuilder) Build() SourceConfig { + if b.cfg.initialSplits <= 0 { + panic(fmt.Sprintf("SourceConfig.InitialSplits must be >= 1. Got: %v", b.cfg.initialSplits)) + } + if b.cfg.numElements <= 0 { + panic(fmt.Sprintf("SourceConfig.NumElements must be >= 1. Got: %v", b.cfg.numElements)) } + return b.cfg } // SourceConfig is a struct containing all the configuration options for a -// synthetic source. +// synthetic source. It should be created via a SourceConfigBuilder. type SourceConfig struct { - // NumElements is the number of elements for the source to generate and - // emit. - NumElements int - - // InitialSplits determines the number of initial splits to perform in the - // source's SplitRestriction method. Note that in some edge cases, the - // number of splits performed might differ from this config value. Each - // restriction will always have one element in it, and at least one - // restriction will always be output, so the number of splits will be in - // the range of [1, N] where N is the size of the original restriction. - InitialSplits int + numElements int Review comment: Note: Default encoding with JSON (or even future with beam.Schemas) will not encode unexported fields. You'll need to register a coder with beam.RegisterCoder if you want to ensure these get encoded properly. Alternatively, having the fields be exported while still providing and recommending a builder is not unreason
[GitHub] [beam] tysonjh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms
tysonjh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r426831619 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.privacy.dlp.v2.Table; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP + * service. + */ +@Experimental +class BatchRequestForDLP extends DoFn, KV>> { + public static final Logger LOG = LoggerFactory.getLogger(BatchRequestForDLP.class); + + private final Counter numberOfRowsBagged = + Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged"); + private final Integer batchSize; + + @StateId("elementsBag") + private final StateSpec>> elementsBag = StateSpecs.bag(); + + @TimerId("eventTimer") + private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + public BatchRequestForDLP(Integer batchSize) { +this.batchSize = batchSize; + } + + @ProcessElement + public void process( + @Element KV element, + @StateId("elementsBag") BagState> elementsBag, + @TimerId("eventTimer") Timer eventTimer, + BoundedWindow w) { +elementsBag.add(element); +eventTimer.set(w.maxTimestamp()); + } + + @OnTimer("eventTimer") + public void onTimer( + @StateId("elementsBag") BagState> elementsBag, + OutputReceiver>> output) { +String key = elementsBag.read().iterator().next().getKey(); Review comment: Is there a guarantee that at least one element will be in the elementsBag iterator or is there a chance for a NoSuchElementsException on the `next()` call? ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.DeidentifyContentRequest; +import com.google.privacy.dlp.v2.DeidentifyContentResponse; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullab
[GitHub] [beam] omarismail94 commented on a change in pull request #11737: [BEAM-9984] Support BIT_OR aggregation function in Beam SQL
omarismail94 commented on a change in pull request #11737: URL: https://github.com/apache/beam/pull/11737#discussion_r426935461 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -347,4 +357,35 @@ public BigDecimal toBigDecimal(BigDecimal record) { return record; } } + + static class BitOr extends CombineFn { +static class Accum { Review comment: Actually, this might work, let me test this ``` static class BitOr extends CombineFn { @Override public Long createAccumulator() { return 0L; } @Override public Long addInput(Long accum, T input) { return accum | input.longValue(); } @Override public Long mergeAccumulators(Iterable accums) { Long merged = createAccumulator(); for (Long accum : accums) { merged = merged | accum; } return merged; } @Override public Long extractOutput(Long accum) { return accum; } } } ``` ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -347,4 +357,35 @@ public BigDecimal toBigDecimal(BigDecimal record) { return record; } } + + static class BitOr extends CombineFn { +static class Accum { Review comment: It worked! Will commit this now! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on pull request #11529: [BEAM-9822] Simplify pipeline when batching is disabled.
nielm commented on pull request #11529: URL: https://github.com/apache/beam/pull/11529#issuecomment-630476722 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on a change in pull request #11737: [BEAM-9984] Support BIT_OR aggregation function in Beam SQL
omarismail94 commented on a change in pull request #11737: URL: https://github.com/apache/beam/pull/11737#discussion_r426931476 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -347,4 +357,35 @@ public BigDecimal toBigDecimal(BigDecimal record) { return record; } } + + static class BitOr extends CombineFn { +static class Accum { Review comment: Hmm, Im not sure if this works. How would I create the accumulator? I can't do `new Long()`. That's why I wrapped `long` in Accum This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on pull request #11532: [BEAM-9822] Disable grouping when streaming
nielm commented on pull request #11532: URL: https://github.com/apache/beam/pull/11532#issuecomment-630472990 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb merged pull request #11708: [BEAM-9577] Artifact v2 support for uber jars.
robertwb merged pull request #11708: URL: https://github.com/apache/beam/pull/11708 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on a change in pull request #11532: [BEAM-9822] Disable grouping when streaming
nielm commented on a change in pull request #11532: URL: https://github.com/apache/beam/pull/11532#discussion_r426929817 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ## @@ -1066,7 +1079,12 @@ public SpannerWriteResult expand(PCollection input) { spec.getBatchSizeBytes(), spec.getMaxNumMutations(), spec.getMaxNumRows(), - spec.getGroupingFactor(), + // Do not group on streaming unless explicitly set. + spec.getGroupingFactor() + .orElse( + input.isBounded() == IsBounded.BOUNDED Review comment: it's kinda both! If the source is unbounded (streaming) - and the groupingFactor has not been specified by the user, then default to no grouping. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #11528: [BEAM-9821] Populate all SpannerIO batching parameters in display data.
TheNeuralBit commented on pull request #11528: URL: https://github.com/apache/beam/pull/11528#issuecomment-630470331 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] suztomo commented on a change in pull request #11674: [BEAM-9958] Linkage Checker to use exclusion files as baseline
suztomo commented on a change in pull request #11674: URL: https://github.com/apache/beam/pull/11674#discussion_r426927838 ## File path: sdks/java/build-tools/beam-linkage-check.sh ## @@ -66,51 +66,61 @@ if [ ! -z "$(git diff)" ]; then exit 1 fi +ACCUMULATED_RESULT=0 + function runLinkageCheck () { COMMIT=$1 BRANCH=$2 + MODE=$3 # baseline or validate # An empty invocation so that the subsequent checkJavaLinkage does not # contain garbage echo "`date`:" "Installing artifacts of ${BRANCH}(${COMMIT}) to Maven local repository." - ./gradlew -Ppublishing -PjavaLinkageArtifactIds=beam-sdks-java-core :checkJavaLinkage > /dev/null 2>&1 + ./gradlew -Ppublishing -PjavaLinkageArtifactIds=beam-sdks-java-core -PjavaLinkageWriteBaseline=/dev/null :checkJavaLinkage > /dev/null 2>&1 for ARTIFACT in $ARTIFACTS; do -echo "`date`:" "Running linkage check for ${ARTIFACT} in ${BRANCH}" -# Removing time taken to have clean diff -./gradlew -Ppublishing -PjavaLinkageArtifactIds=$ARTIFACT :checkJavaLinkage |grep -v 'BUILD SUCCESSFUL in' | grep -v 'dependency paths' > ${OUTPUT_DIR}/${COMMIT}-${ARTIFACT} -echo "`date`:" "Done: ${OUTPUT_DIR}/${COMMIT}-${ARTIFACT}" +echo "`date`:" "Running linkage check (${MODE}) for ${ARTIFACT} in ${BRANCH}" + +BASELINE_FILE=${OUTPUT_DIR}/baseline-${ARTIFACT}.xml +if [ "$MODE" = "baseline" ]; then + BASELINE_OPTION='-PjavaLinkageWriteBaseline' + echo "`date`:" "to create a baseline (existing errors before change) $BASELINE_FILE" +elif [ "$MODE" = "validate" ]; then + BASELINE_OPTION='-PjavaLinkageReadBaseline' + echo "`date`:" "using baseline $BASELINE_FILE" +else + echo "invalid parameter for runLinkageCheck: ${MODE}" Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] suztomo commented on pull request #11674: [BEAM-9958] Linkage Checker to use exclusion files as baseline
suztomo commented on pull request #11674: URL: https://github.com/apache/beam/pull/11674#issuecomment-630470049 @aaltay Thank you for taking review. PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on pull request #11528: [BEAM-9821] Populate all SpannerIO batching parameters in display data.
nielm commented on pull request #11528: URL: https://github.com/apache/beam/pull/11528#issuecomment-630469413 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
robertwb commented on a change in pull request #11039: URL: https://github.com/apache/beam/pull/11039#discussion_r426926720 ## File path: runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ## @@ -195,7 +187,7 @@ public void testFileWithExtensionPackageNamingAndSize() throws Exception { PackageAttributes attr = makePackageAttributes(tmpFile, null); DataflowPackage target = attr.getDestination(); -assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt")); +assertThat(target.getName(), RegexMatcher.matches(UUID_PATTERN + ".txt")); Review comment: Is it important, for the purposes of this test (or Dataflow in general) that the staged file name is in the form of a UUID? If not (and I don't think it is) it's better not to test for it. (A test that might be good to add is to see if two same-named files in different directories actually get staged to different places, which is the underlying, important intent.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on a change in pull request #11528: [BEAM-9821] Populate all SpannerIO batching parameters in display data.
nielm commented on a change in pull request #11528: URL: https://github.com/apache/beam/pull/11528#discussion_r426926639 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ## @@ -991,6 +1001,24 @@ public WriteGrouped(Write spec) { this.spec = spec; } +@Override +public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + spec.getSpannerConfig().populateDisplayData(builder); + builder.add( + DisplayData.item("batchSizeBytes", spec.getBatchSizeBytes()) + .withLabel("Max batch size in sytes")); + builder.add( + DisplayData.item("maxNumMutations", spec.getMaxNumMutations()) + .withLabel("Max number of mutated cells in each batch")); + builder.add( + DisplayData.item("maxNumRows", spec.getMaxNumRows()) + .withLabel("Max number of rows in each batch")); + builder.add( + DisplayData.item("groupingFactor", spec.getGroupingFactor()) + .withLabel("Number of batches to sort over")); +} + Review comment: LGTM. I extracted a single method in Write to populate the displayData with parameters to avoid repeating code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji commented on a change in pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
ihji commented on a change in pull request #11039: URL: https://github.com/apache/beam/pull/11039#discussion_r426924859 ## File path: runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ## @@ -195,7 +187,7 @@ public void testFileWithExtensionPackageNamingAndSize() throws Exception { PackageAttributes attr = makePackageAttributes(tmpFile, null); DataflowPackage target = attr.getDestination(); -assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt")); +assertThat(target.getName(), RegexMatcher.matches(UUID_PATTERN + ".txt")); Review comment: but it only checks whether the staged file name has the same extension (vs. checks whether the staged file name is in the form of UUID with the same extension) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
chamikaramj commented on pull request #11039: URL: https://github.com/apache/beam/pull/11039#issuecomment-630465449 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
chamikaramj commented on pull request #11039: URL: https://github.com/apache/beam/pull/11039#issuecomment-630465362 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image
chamikaramj commented on pull request #11740: URL: https://github.com/apache/beam/pull/11740#issuecomment-630462316 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] youngoli commented on pull request #11747: [BEAM-9951] Using the builder pattern for Go synthetic config frontend
youngoli commented on pull request #11747: URL: https://github.com/apache/beam/pull/11747#issuecomment-630460354 R: @lostluck This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] youngoli opened a new pull request #11747: [BEAM-9951] Using the builder pattern for Go synthetic config frontend
youngoli opened a new pull request #11747: URL: https://github.com/apache/beam/pull/11747 Instead of just creating SourceConfigs and StepConfigs, have a builder pattern to allow more user-friendly creation of those configs with defaults. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badg
[GitHub] [beam] omarismail94 commented on a change in pull request #11737: [BEAM-9984] Support BIT_OR aggregation function in Beam SQL
omarismail94 commented on a change in pull request #11737: URL: https://github.com/apache/beam/pull/11737#discussion_r426918300 ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -4500,6 +4499,23 @@ public void testIsNullTrueFalse() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + @Test + public void testZetaSQLBitOr() { Review comment: Zeta was surprisingly easier than Calcite! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on a change in pull request #11737: [BEAM-9984] Support BIT_OR aggregation function in Beam SQL
omarismail94 commented on a change in pull request #11737: URL: https://github.com/apache/beam/pull/11737#discussion_r426918114 ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -2836,7 +2836,6 @@ public void testDistinctOnNull() { } @Test - @Ignore("BeamSQL does not support ANY_VALUE") Review comment: Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on a change in pull request #11737: [BEAM-9984] Support BIT_OR aggregation function in Beam SQL
omarismail94 commented on a change in pull request #11737: URL: https://github.com/apache/beam/pull/11737#discussion_r426918047 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -171,10 +173,18 @@ static CombineFn createAvg(Schema.FieldType fieldType) { return new BigDecimalAvg(); default: throw new UnsupportedOperationException( -String.format("[%s] is not support in AVG", fieldType)); +String.format("[%s] is not supported in AVG", fieldType)); } } + static CombineFn createBitOr(Schema.FieldType fieldType) { +if (fieldType.getTypeName() == TypeName.INT64) { + return new BitOr(); +} +throw new UnsupportedOperationException( +String.format("[%s] is not supported in BIT_OR", fieldType)); Review comment: I saw the other functions in this class do something similar, so I thought I'd do so as well This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io
pabloem commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-630455763 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image
chamikaramj commented on pull request #11740: URL: https://github.com/apache/beam/pull/11740#issuecomment-630445775 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image
chamikaramj commented on pull request #11740: URL: https://github.com/apache/beam/pull/11740#issuecomment-630445694 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik opened a new pull request #11746: [BEAM-10028] Add support for the state backed iterable coder to the Java SDK harness.
lukecwik opened a new pull request #11746: URL: https://github.com/apache/beam/pull/11746 This required supporting a translation context through CoderTranslator to give access to the BeamFnStateClient and current process bundle instruction id. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/bad
[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io
pabloem commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-630440098 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] apilloud merged pull request #11272: [BEAM-9641] Support ZetaSQL DATE type as a Beam LogicalType
apilloud merged pull request #11272: URL: https://github.com/apache/beam/pull/11272 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io
pabloem commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-630433535 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #11075: [BEAM-9421] Website section that describes getting predictions using AI Platform Prediciton
aaltay commented on pull request #11075: URL: https://github.com/apache/beam/pull/11075#issuecomment-630432367 /cc @vilobhmm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on a change in pull request #11075: [BEAM-9421] Website section that describes getting predictions using AI Platform Prediciton
aaltay commented on a change in pull request #11075: URL: https://github.com/apache/beam/pull/11075#discussion_r426889254 ## File path: website/www/site/content/en/documentation/patterns/ai-platform.md ## @@ -0,0 +1,79 @@ +--- +title: "AI Platform integration patterns" +--- + + +# AI Platform integration patterns + +This page describes common patterns in pipelines with Google Cloud AI Platform transforms. + +{{< language-switcher java py >}} + +## Getting predictions + +This section shows how to use [Google Cloud AI Platform Prediction](https://cloud.google.com/ai-platform/prediction/docs/overview) to make predictions about new data from a cloud-hosted machine learning model. + +[tfx_bsl](https://github.com/tensorflow/tfx-bsl) is a library with a Beam PTransform called `RunInference`. `RunInference` is able to perform an inference that can use an external service endpoint for receiving data. When using a service endpoint, the transform takes a PCollection of type `tf.train.Example` and, for every batch of elements, sends a request to AI Platform Prediction. The size of a batch may vary. For more details on how Beam finds the best batch size, refer to a docstring for [BatchElements](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=batchelements#apache_beam.transforms.util.BatchElements). Review comment: `tf.train.Example` -> `tf.train.Example` or `tf.train.SequenceExample` ## File path: website/www/site/content/en/documentation/patterns/ai-platform.md ## @@ -0,0 +1,79 @@ +--- +title: "AI Platform integration patterns" +--- + + +# AI Platform integration patterns + +This page describes common patterns in pipelines with Google Cloud AI Platform transforms. + +{{< language-switcher java py >}} + +## Getting predictions + +This section shows how to use [Google Cloud AI Platform Prediction](https://cloud.google.com/ai-platform/prediction/docs/overview) to make predictions about new data from a cloud-hosted machine learning model. + +[tfx_bsl](https://github.com/tensorflow/tfx-bsl) is a library with a Beam PTransform called `RunInference`. `RunInference` is able to perform an inference that can use an external service endpoint for receiving data. When using a service endpoint, the transform takes a PCollection of type `tf.train.Example` and, for every batch of elements, sends a request to AI Platform Prediction. The size of a batch may vary. For more details on how Beam finds the best batch size, refer to a docstring for [BatchElements](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=batchelements#apache_beam.transforms.util.BatchElements). + + The transform produces a PCollection of type `PredictionLog`, which contains predictions. + +Before getting started, deploy a TensorFlow model to AI Platform Prediction. The cloud service manages the infrastructure needed to handle prediction requests in both efficient and scalable way. Do note that only TensorFlow models are supported by the transform. For more information, see [Exporting a SavedModel for prediction](https://cloud.google.com/ai-platform/prediction/docs/exporting-savedmodel-for-prediction). + +Once a machine learning model is deployed, prepare a list of instances to get predictions for. To send binary data, make sure that the name of an input ends in `_bytes`. This will base64-encode data before sending a request. Review comment: Is this still applicable? /cc @rose-rong-liu ## File path: website/www/site/content/en/documentation/patterns/ai-platform.md ## @@ -0,0 +1,79 @@ +--- +title: "AI Platform integration patterns" +--- + + +# AI Platform integration patterns + +This page describes common patterns in pipelines with Google Cloud AI Platform transforms. + +{{< language-switcher java py >}} + +## Getting predictions + +This section shows how to use [Google Cloud AI Platform Prediction](https://cloud.google.com/ai-platform/prediction/docs/overview) to make predictions about new data from a cloud-hosted machine learning model. + +[tfx_bsl](https://github.com/tensorflow/tfx-bsl) is a library with a Beam PTransform called `RunInference`. `RunInference` is able to perform an inference that can use an external service endpoint for receiving data. When using a service endpoint, the transform takes a PCollection of type `tf.train.Example` and, for every batch of elements, sends a request to AI Platform Prediction. The size of a batch may vary. For more details on how Beam finds the best batch size, refer to a docstring for [BatchElements](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=batchelements#apache_beam.transforms.util.BatchElements). Review comment: The size of a batch may vary. -> The size of a batch is automatically computed. ## File path: website/www/site/content/en/documentation/patterns/ai
[GitHub] [beam] aaltay commented on pull request #11075: [BEAM-9421] Website section that describes getting predictions using AI Platform Prediciton
aaltay commented on pull request #11075: URL: https://github.com/apache/beam/pull/11075#issuecomment-630430689 @kamilwu -- @katsiapis had a few questions. Please do not merge yet, I will add his comments here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #11039: [BEAM-9383] Staging Dataflow artifacts from environment
robertwb commented on a change in pull request #11039: URL: https://github.com/apache/beam/pull/11039#discussion_r426876833 ## File path: runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ## @@ -195,7 +187,7 @@ public void testFileWithExtensionPackageNamingAndSize() throws Exception { PackageAttributes attr = makePackageAttributes(tmpFile, null); DataflowPackage target = attr.getDestination(); -assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt")); +assertThat(target.getName(), RegexMatcher.matches(UUID_PATTERN + ".txt")); Review comment: You could just match `".*.txt"` here, rather than hard-coding the uuid format. (Same blow.) ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ## @@ -784,7 +877,25 @@ public DataflowPipelineJob run(Pipeline pipeline) { "Executing pipeline on the Dataflow Service, which will have billing implications " + "related to Google Compute Engine usage and other Google Cloud Services."); -List packages = options.getStager().stageDefaultFiles(); +// Capture the sdkComponents for look up during step translations +SdkComponents sdkComponents = SdkComponents.create(); + +DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); +String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions); +RunnerApi.Environment defaultEnvironmentForDataflow = +Environments.createDockerEnvironment(workerHarnessContainerImageURL); + +sdkComponents.registerEnvironment( +defaultEnvironmentForDataflow +.toBuilder() +.addAllDependencies(getDefaultArtifacts()) +.build()); + +RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); + +LOG.debug("Portable pipeline proto:\n{}", TextFormat.printToString(pipelineProto)); Review comment: OK, we don't have to change this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] suztomo commented on a change in pull request #11674: [BEAM-9958] Linkage Checker to use exclusion files as baseline
suztomo commented on a change in pull request #11674: URL: https://github.com/apache/beam/pull/11674#discussion_r426871194 ## File path: sdks/java/build-tools/beam-linkage-check.sh ## @@ -66,51 +66,61 @@ if [ ! -z "$(git diff)" ]; then exit 1 fi +ACCUMULATED_RESULT=0 + function runLinkageCheck () { COMMIT=$1 BRANCH=$2 + MODE=$3 # baseline or validate # An empty invocation so that the subsequent checkJavaLinkage does not # contain garbage echo "`date`:" "Installing artifacts of ${BRANCH}(${COMMIT}) to Maven local repository." - ./gradlew -Ppublishing -PjavaLinkageArtifactIds=beam-sdks-java-core :checkJavaLinkage > /dev/null 2>&1 + ./gradlew -Ppublishing -PjavaLinkageArtifactIds=beam-sdks-java-core -PjavaLinkageWriteBaseline=/dev/null :checkJavaLinkage > /dev/null 2>&1 for ARTIFACT in $ARTIFACTS; do -echo "`date`:" "Running linkage check for ${ARTIFACT} in ${BRANCH}" -# Removing time taken to have clean diff -./gradlew -Ppublishing -PjavaLinkageArtifactIds=$ARTIFACT :checkJavaLinkage |grep -v 'BUILD SUCCESSFUL in' | grep -v 'dependency paths' > ${OUTPUT_DIR}/${COMMIT}-${ARTIFACT} -echo "`date`:" "Done: ${OUTPUT_DIR}/${COMMIT}-${ARTIFACT}" +echo "`date`:" "Running linkage check (${MODE}) for ${ARTIFACT} in ${BRANCH}" + +BASELINE_FILE=${OUTPUT_DIR}/baseline-${ARTIFACT}.xml +if [ "$MODE" = "baseline" ]; then + BASELINE_OPTION='-PjavaLinkageWriteBaseline' + echo "`date`:" "to create a baseline (existing errors before change) $BASELINE_FILE" +elif [ "$MODE" = "validate" ]; then + BASELINE_OPTION='-PjavaLinkageReadBaseline' + echo "`date`:" "using baseline $BASELINE_FILE" +else + echo "invalid parameter for runLinkageCheck: ${MODE}" Review comment: Good point. Will fix that (I only thought about the use case from the shell script, but now I remember that directly running the task is also useful.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rohdesamuel commented on pull request #11745: Add to/from_runner_api_parameters to WriteToBigQuery
rohdesamuel commented on pull request #11745: URL: https://github.com/apache/beam/pull/11745#issuecomment-630412286 R: @robertwb can you take a look please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rohdesamuel opened a new pull request #11745: Add to/from_runner_api_parameters to WriteToBigQuery
rohdesamuel opened a new pull request #11745: URL: https://github.com/apache/beam/pull/11745 Change-Id: Ifd969174d5a7744766173f35fd6b65a72ebfd924 The WriteToBigQuery only has a default implementation which can't reconstruct the original WriteToBigQuery. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastC
[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io
pabloem commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-630408509 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #11350: [BEAM-1589] Added @OnWindowExpiration annotation.
reuvenlax commented on pull request #11350: URL: https://github.com/apache/beam/pull/11350#issuecomment-630407268 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io
pabloem commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-630406503 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io
pabloem commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-630406334 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #11737: [BEAM-9984] Support BIT_OR aggregation function in Beam SQL
amaliujia commented on a change in pull request #11737: URL: https://github.com/apache/beam/pull/11737#discussion_r426859494 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -347,4 +357,35 @@ public BigDecimal toBigDecimal(BigDecimal record) { return record; } } + + static class BitOr extends CombineFn { +static class Accum { Review comment: I am thinking there is no need to define a class to wrap `long val`? Just use `Long`? Also you could consider this interface: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java#L129? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org